diff options
-rw-r--r-- | lib/git/odb/fun.py | 17 | ||||
-rw-r--r-- | lib/git/odb/stream.py | 66 | ||||
-rw-r--r-- | test/git/performance/test_streams.py | 13 | ||||
-rw-r--r-- | test/git/test_odb.py | 202 | ||||
-rw-r--r-- | test/testlib/helper.py | 443 |
5 files changed, 490 insertions, 251 deletions
diff --git a/lib/git/odb/fun.py b/lib/git/odb/fun.py index 870a6f02..3321a8ea 100644 --- a/lib/git/odb/fun.py +++ b/lib/git/odb/fun.py @@ -83,26 +83,33 @@ def write_object(type, size, read, write, chunk_size=chunk_size): :param size: amount of bytes to write from source_stream :param read: read method of a stream providing the content data :param write: write method of the output stream - :param close_target_stream: if True, the target stream will be closed when + :param close_target_stream: if True, the target stream will be closed when the routine exits, even if an error is thrown :return: The actual amount of bytes written to stream, which includes the header and a trailing newline""" tbw = 0 # total num bytes written - dbw = 0 # num data bytes written # WRITE HEADER: type SP size NULL tbw += write("%s %i\0" % (type, size)) + tbw += stream_copy(read, write, size, chunk_size) + + return tbw +def stream_copy(read, write, size, chunk_size): + """Copy a stream up to size bytes using the provided read and write methods, + in chunks of chunk_size + :note: its much like stream_copy utility, but operates just using methods""" + dbw = 0 # num data bytes written + # WRITE ALL DATA UP TO SIZE while True: cs = min(chunk_size, size-dbw) data_len = write(read(cs)) dbw += data_len if data_len < cs or dbw == size: - tbw += dbw break # END check for stream end # END duplicate data - return tbw - + return dbw + #} END routines diff --git a/lib/git/odb/stream.py b/lib/git/odb/stream.py index d1181382..654bcbf6 100644 --- a/lib/git/odb/stream.py +++ b/lib/git/odb/stream.py @@ -75,7 +75,7 @@ class OStream(OInfo): """:return: True if reads of this stream yield zlib compressed data. Default False :note: this does not imply anything about the actual internal storage. Hence the data could be uncompressed, but read compressed, or vice versa""" - raise False + return False #} END interface @@ -105,10 +105,12 @@ class IStream(list): #{ Interface + @property def hexsha(self): """:return: our sha, hex encoded, 40 bytes""" return to_hex_sha(self[0]) - + + @property def binsha(self): """:return: our sha as binary, 20 bytes""" return to_bin_sha(self[0]) @@ -229,10 +231,11 @@ class DecompressMemMapReader(object): and decompress it into chunks, thats all ... """ __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') - max_read_size = 512*1024 + max_read_size = 512*1024 # currently unused def __init__(self, m, close_on_deletion, size): - """Initialize with mmap for stream reading""" + """Initialize with mmap for stream reading + :param m: must be content data - use new if you have object data and no size""" self._m = m self._zip = zlib.decompressobj() self._buf = None # buffer of decompressed bytes @@ -248,32 +251,38 @@ class DecompressMemMapReader(object): self._m.close() # END handle resource freeing - @classmethod - def new(self, m, close_on_deletion=False): - """Create a new DecompressMemMapReader instance for acting as a read-only stream - This method parses the object header from m and returns the parsed - type and size, as well as the created stream instance. - :param m: memory map on which to oparate - :param close_on_deletion: if True, the memory map will be closed once we are - being deleted""" - inst = DecompressMemMapReader(m, close_on_deletion, 0) - + def _parse_header_info(self): + """If this stream contains object data, parse the header info and skip the + stream to a point where each read will yield object content + :return: parsed type_string, size""" # read header maxb = 512 # should really be enough, cgit uses 8192 I believe - inst._s = maxb - hdr = inst.read(maxb) + self._s = maxb + hdr = self.read(maxb) hdrend = hdr.find("\0") type, size = hdr[:hdrend].split(" ") size = int(size) - inst._s = size + self._s = size # adjust internal state to match actual header length that we ignore # The buffer will be depleted first on future reads - inst._br = 0 + self._br = 0 hdrend += 1 # count terminating \0 - inst._buf = StringIO(hdr[hdrend:]) - inst._buflen = len(hdr) - hdrend + self._buf = StringIO(hdr[hdrend:]) + self._buflen = len(hdr) - hdrend + + return type, size + @classmethod + def new(self, m, close_on_deletion=False): + """Create a new DecompressMemMapReader instance for acting as a read-only stream + This method parses the object header from m and returns the parsed + type and size, as well as the created stream instance. + :param m: memory map on which to oparate. It must be object data ( header + contents ) + :param close_on_deletion: if True, the memory map will be closed once we are + being deleted""" + inst = DecompressMemMapReader(m, close_on_deletion, 0) + type, size = inst._parse_header_info() return type, size, inst def read(self, size=-1): @@ -355,17 +364,22 @@ class DecompressMemMapReader(object): # needs to be as large as the uncompressed bytes we want to read. self._cws = self._cwe - len(tail) self._cwe = self._cws + size - - - indata = self._m[self._cws:self._cwe] # another copy ... :( - # get the actual window end to be sure we don't use it for computations - self._cwe = self._cws + len(indata) else: cws = self._cws self._cws = self._cwe self._cwe = cws + size - indata = self._m[self._cws:self._cwe] # ... copy it again :( # END handle tail + + + # if window is too small, make it larger so zip can decompress something + win_size = self._cwe - self._cws + if win_size < 8: + self._cwe = self._cws + 8 + # END adjust winsize + indata = self._m[self._cws:self._cwe] # another copy ... :( + + # get the actual window end to be sure we don't use it for computations + self._cwe = self._cws + len(indata) dcompdat = self._zip.decompress(indata, size) diff --git a/test/git/performance/test_streams.py b/test/git/performance/test_streams.py index 01ec9fc4..8f600cb3 100644 --- a/test/git/performance/test_streams.py +++ b/test/git/performance/test_streams.py @@ -3,13 +3,11 @@ from test.testlib import * from git.odb import * -from array import array from cStringIO import StringIO from time import time import os import sys import stat -import random import subprocess @@ -18,18 +16,11 @@ from lib import ( ) - def make_memory_file(size_in_bytes, randomize=False): """:return: tuple(size_of_stream, stream) :param randomize: try to produce a very random stream""" - actual_size = size_in_bytes / 4 - producer = xrange(actual_size) - if randomize: - producer = list(producer) - random.shuffle(producer) - # END randomize - a = array('i', producer) - return actual_size*4, StringIO(a.tostring()) + d = make_bytes(size_in_bytes, randomize) + return len(d), StringIO(d) class TestObjDBPerformance(TestBigRepoR): diff --git a/test/git/test_odb.py b/test/git/test_odb.py index c3a03714..2f8ebd17 100644 --- a/test/git/test_odb.py +++ b/test/git/test_odb.py @@ -1,14 +1,210 @@ """Test for object db""" - from test.testlib import * from git.odb import * +from git.odb.utils import ( + to_hex_sha, + to_bin_sha + ) from git.odb.stream import Sha1Writer from git import Blob from git.errors import BadObject - from cStringIO import StringIO +import tempfile import os +import zlib + + +#{ Stream Utilities + +class DummyStream(object): + def __init__(self): + self.was_read = False + self.bytes = 0 + self.closed = False + + def read(self, size): + self.was_read = True + self.bytes = size + + def close(self): + self.closed = True + + def _assert(self): + assert self.was_read + +class DeriveTest(OStream): + def __init__(self, sha, type, size, stream, *args, **kwargs): + self.myarg = kwargs.pop('myarg') + self.args = args + + def _assert(self): + assert self.args + assert self.myarg + +#} END stream utilitiess + + +class TestStream(TestBase): + """Test stream classes""" + + data_sizes = (15, 10000, 1000*1024+512) + + def test_streams(self): + # test info + sha = Blob.NULL_HEX_SHA + s = 20 + info = OInfo(sha, Blob.type, s) + assert info.sha == sha + assert info.type == Blob.type + assert info.size == s + + # test ostream + stream = DummyStream() + ostream = OStream(*(info + (stream, ))) + ostream.read(15) + stream._assert() + assert stream.bytes == 15 + ostream.read(20) + assert stream.bytes == 20 + + # defaults false + assert not ostream.is_compressed() + + # derive with own args + DeriveTest(sha, Blob.type, s, stream, 'mine',myarg = 3)._assert() + + # test istream + istream = IStream(Blob.type, s, stream) + assert not istream.is_compressed() + assert istream.sha == None + istream.sha = sha + assert istream.sha == sha + + assert len(istream.binsha) == 20 + assert len(istream.hexsha) == 40 + + assert istream.size == s + istream.size = s * 2 + istream.size == s * 2 + assert istream.type == Blob.type + istream.type = "something" + assert istream.type == "something" + assert istream.stream is stream + istream.stream = None + assert istream.stream is None + + def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): + """Make stream tests - the orig_stream is seekable, allowing it to be + rewound and reused + :param cdata: the data we expect to read from stream, the contents + :param rewind_stream: function called to rewind the stream to make it ready + for reuse""" + ns = 10 + assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata)) + + # read in small steps + ss = len(cdata) / ns + for i in range(ns): + data = stream.read(ss) + chunk = cdata[i*ss:(i+1)*ss] + assert data == chunk + # END for each step + rest = stream.read() + if rest: + assert rest == cdata[-len(rest):] + # END handle rest + + rewind_stream(stream) + + # read everything + rdata = stream.read() + assert rdata == cdata + + def test_decompress_reader(self): + for close_on_deletion in range(2): + for with_size in range(2): + for ds in self.data_sizes: + cdata = make_bytes(ds, randomize=False) + + # zdata = zipped actual data + # cdata = original content data + + # create reader + if with_size: + # need object data + zdata = zlib.compress(make_object(Blob.type, cdata)) + type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) + assert size == len(cdata) + assert type == Blob.type + else: + # here we need content data + zdata = zlib.compress(cdata) + reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) + assert reader._s == len(cdata) + # END get reader + + def rewind(r): + r._zip = zlib.decompressobj() + r._br = r._cws = r._cwe = 0 + if with_size: + r._parse_header_info() + # END skip header + # END make rewind func + + self._assert_stream_reader(reader, cdata, rewind) + + # put in a dummy stream for closing + dummy = DummyStream() + reader._m = dummy + + assert not dummy.closed + del(reader) + assert dummy.closed == close_on_deletion + #zdi# + # END for each datasize + # END whether size should be used + # END whether stream should be closed when deleted + + def test_sha_writer(self): + writer = Sha1Writer() + assert 2 == writer.write("hi") + assert len(writer.sha(as_hex=1)) == 40 + assert len(writer.sha(as_hex=0)) == 20 + + # make sure it does something ;) + prev_sha = writer.sha() + writer.write("hi again") + assert writer.sha() != prev_sha + + def test_compressed_writer(self): + for ds in self.data_sizes: + fd, path = tempfile.mkstemp() + ostream = FDCompressedSha1Writer(fd) + data = make_bytes(ds, randomize=False) + + # for now, just a single write, code doesn't care about chunking + assert len(data) == ostream.write(data) + ostream.close() + # its closed already + self.failUnlessRaises(OSError, os.close, fd) + + # read everything back, compare to data we zip + fd = os.open(path, os.O_RDONLY) + written_data = os.read(fd, os.path.getsize(path)) + os.close(fd) + assert written_data == zlib.compress(data, 1) # best speed + + os.remove(path) + # END for each os + + +class TestUtils(TestBase): + def test_basics(self): + assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA + assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20 + assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA + class TestDB(TestBase): """Test the different db class implementations""" @@ -35,6 +231,8 @@ class TestDB(TestBase): assert type(prev_ostream) in ostreams or prev_ostream in ostreams istream = IStream(Blob.type, len(data), StringIO(data)) + + # store returns same istream instance, with new sha set my_istream = db.store(istream) sha = istream.sha assert my_istream is istream diff --git a/test/testlib/helper.py b/test/testlib/helper.py index c9b4c2ac..457cc26c 100644 --- a/test/testlib/helper.py +++ b/test/testlib/helper.py @@ -9,227 +9,256 @@ from git import Repo, Remote, GitCommandError from unittest import TestCase import tempfile import shutil +import random +from array import array import cStringIO GIT_REPO = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +#{ Routines + def fixture_path(name): - test_dir = os.path.dirname(os.path.dirname(__file__)) - return os.path.join(test_dir, "fixtures", name) + test_dir = os.path.dirname(os.path.dirname(__file__)) + return os.path.join(test_dir, "fixtures", name) def fixture(name): - return open(fixture_path(name), 'rb').read() + return open(fixture_path(name), 'rb').read() def absolute_project_path(): - return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) - - + return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + +def make_bytes(size_in_bytes, randomize=False): + """:return: string with given size in bytes + :param randomize: try to produce a very random stream""" + actual_size = size_in_bytes / 4 + producer = xrange(actual_size) + if randomize: + producer = list(producer) + random.shuffle(producer) + # END randomize + a = array('i', producer) + return a.tostring() + + +def make_object(type, data): + """:return: bytes resembling an uncompressed object""" + odata = "blob %i\0" % len(data) + return odata + data + +#} END routines + +#{ Adapters + class StringProcessAdapter(object): - """Allows to use strings as Process object as returned by SubProcess.Popen. - Its tailored to work with the test system only""" - - def __init__(self, input_string): - self.stdout = cStringIO.StringIO(input_string) - self.stderr = cStringIO.StringIO() - - def wait(self): - return 0 - - poll = wait - + """Allows to use strings as Process object as returned by SubProcess.Popen. + Its tailored to work with the test system only""" + + def __init__(self, input_string): + self.stdout = cStringIO.StringIO(input_string) + self.stderr = cStringIO.StringIO() + + def wait(self): + return 0 + + poll = wait + +#} END adapters + +#{ Decorators def _rmtree_onerror(osremove, fullpath, exec_info): - """ - Handle the case on windows that read-only files cannot be deleted by - os.remove by setting it to mode 777, then retry deletion. - """ - if os.name != 'nt' or osremove is not os.remove: - raise - - os.chmod(fullpath, 0777) - os.remove(fullpath) + """ + Handle the case on windows that read-only files cannot be deleted by + os.remove by setting it to mode 777, then retry deletion. + """ + if os.name != 'nt' or osremove is not os.remove: + raise + + os.chmod(fullpath, 0777) + os.remove(fullpath) def with_bare_rw_repo(func): - """ - Decorator providing a specially made read-write repository to the test case - decorated with it. The test case requires the following signature:: - def case(self, rw_repo) - - The rwrepo will be a bare clone or the types rorepo. Once the method finishes, - it will be removed completely. - - Use this if you want to make purely index based adjustments, change refs, create - heads, generally operations that do not need a working tree. - """ - def bare_repo_creator(self): - repo_dir = tempfile.mktemp("bare_repo") - rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=True) - prev_cwd = os.getcwd() - try: - return func(self, rw_repo) - finally: - rw_repo.git.clear_cache() - shutil.rmtree(repo_dir, onerror=_rmtree_onerror) - # END cleanup - # END bare repo creator - bare_repo_creator.__name__ = func.__name__ - return bare_repo_creator - + """ + Decorator providing a specially made read-write repository to the test case + decorated with it. The test case requires the following signature:: + def case(self, rw_repo) + + The rwrepo will be a bare clone or the types rorepo. Once the method finishes, + it will be removed completely. + + Use this if you want to make purely index based adjustments, change refs, create + heads, generally operations that do not need a working tree. + """ + def bare_repo_creator(self): + repo_dir = tempfile.mktemp("bare_repo") + rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=True) + prev_cwd = os.getcwd() + try: + return func(self, rw_repo) + finally: + rw_repo.git.clear_cache() + shutil.rmtree(repo_dir, onerror=_rmtree_onerror) + # END cleanup + # END bare repo creator + bare_repo_creator.__name__ = func.__name__ + return bare_repo_creator + def with_rw_repo(working_tree_ref): - """ - Same as with_bare_repo, but clones the rorepo as non-bare repository, checking - out the working tree at the given working_tree_ref. - - This repository type is more costly due to the working copy checkout. - - To make working with relative paths easier, the cwd will be set to the working - dir of the repository. - """ - assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" - def argument_passer(func): - def repo_creator(self): - repo_dir = tempfile.mktemp("non_bare_repo") - rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=False, n=True) - - rw_repo.head.commit = working_tree_ref - rw_repo.head.reference.checkout() - - prev_cwd = os.getcwd() - os.chdir(rw_repo.working_dir) - try: - return func(self, rw_repo) - finally: - os.chdir(prev_cwd) - rw_repo.git.clear_cache() - shutil.rmtree(repo_dir, onerror=_rmtree_onerror) - # END cleanup - # END rw repo creator - repo_creator.__name__ = func.__name__ - return repo_creator - # END argument passer - return argument_passer - + """ + Same as with_bare_repo, but clones the rorepo as non-bare repository, checking + out the working tree at the given working_tree_ref. + + This repository type is more costly due to the working copy checkout. + + To make working with relative paths easier, the cwd will be set to the working + dir of the repository. + """ + assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" + def argument_passer(func): + def repo_creator(self): + repo_dir = tempfile.mktemp("non_bare_repo") + rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=False, n=True) + + rw_repo.head.commit = working_tree_ref + rw_repo.head.reference.checkout() + + prev_cwd = os.getcwd() + os.chdir(rw_repo.working_dir) + try: + return func(self, rw_repo) + finally: + os.chdir(prev_cwd) + rw_repo.git.clear_cache() + shutil.rmtree(repo_dir, onerror=_rmtree_onerror) + # END cleanup + # END rw repo creator + repo_creator.__name__ = func.__name__ + return repo_creator + # END argument passer + return argument_passer + def with_rw_and_rw_remote_repo(working_tree_ref): - """ - Same as with_rw_repo, but also provides a writable remote repository from which the - rw_repo has been forked as well as a handle for a git-daemon that may be started to - run the remote_repo. - The remote repository was cloned as bare repository from the rorepo, wheras - the rw repo has a working tree and was cloned from the remote repository. - - remote_repo has two remotes: origin and daemon_origin. One uses a local url, - the other uses a server url. The daemon setup must be done on system level - and should be an inetd service that serves tempdir.gettempdir() and all - directories in it. - - The following scetch demonstrates this:: - rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo - - The test case needs to support the following signature:: - def case(self, rw_repo, rw_remote_repo) - - This setup allows you to test push and pull scenarios and hooks nicely. - - See working dir info in with_rw_repo - """ - assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" - def argument_passer(func): - def remote_repo_creator(self): - remote_repo_dir = tempfile.mktemp("remote_repo") - repo_dir = tempfile.mktemp("remote_clone_non_bare_repo") - - rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True) - rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ? - rw_repo.head.commit = working_tree_ref - rw_repo.head.reference.checkout() - - # prepare for git-daemon - rw_remote_repo.daemon_export = True - - # this thing is just annoying ! - crw = rw_remote_repo.config_writer() - section = "daemon" - try: - crw.add_section(section) - except Exception: - pass - crw.set(section, "receivepack", True) - # release lock - del(crw) - - # initialize the remote - first do it as local remote and pull, then - # we change the url to point to the daemon. The daemon should be started - # by the user, not by us - d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir) - d_remote.fetch() - remote_repo_url = "git://localhost%s" % remote_repo_dir - d_remote.config_writer.set('url', remote_repo_url) - - # try to list remotes to diagnoes whether the server is up - try: - rw_repo.git.ls_remote(d_remote) - except GitCommandError,e: - print str(e) - if os.name == 'nt': - raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"'%tempfile.gettempdir()) - else: - raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"'%tempfile.gettempdir()) - - # adjust working dir - prev_cwd = os.getcwd() - os.chdir(rw_repo.working_dir) - try: - return func(self, rw_repo, rw_remote_repo) - finally: - os.chdir(prev_cwd) - rw_repo.git.clear_cache() - rw_remote_repo.git.clear_cache() - shutil.rmtree(repo_dir, onerror=_rmtree_onerror) - shutil.rmtree(remote_repo_dir, onerror=_rmtree_onerror) - # END cleanup - # END bare repo creator - remote_repo_creator.__name__ = func.__name__ - return remote_repo_creator - # END remote repo creator - # END argument parsser - - return argument_passer - - + """ + Same as with_rw_repo, but also provides a writable remote repository from which the + rw_repo has been forked as well as a handle for a git-daemon that may be started to + run the remote_repo. + The remote repository was cloned as bare repository from the rorepo, wheras + the rw repo has a working tree and was cloned from the remote repository. + + remote_repo has two remotes: origin and daemon_origin. One uses a local url, + the other uses a server url. The daemon setup must be done on system level + and should be an inetd service that serves tempdir.gettempdir() and all + directories in it. + + The following scetch demonstrates this:: + rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo + + The test case needs to support the following signature:: + def case(self, rw_repo, rw_remote_repo) + + This setup allows you to test push and pull scenarios and hooks nicely. + + See working dir info in with_rw_repo + """ + assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" + def argument_passer(func): + def remote_repo_creator(self): + remote_repo_dir = tempfile.mktemp("remote_repo") + repo_dir = tempfile.mktemp("remote_clone_non_bare_repo") + + rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True) + rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ? + rw_repo.head.commit = working_tree_ref + rw_repo.head.reference.checkout() + + # prepare for git-daemon + rw_remote_repo.daemon_export = True + + # this thing is just annoying ! + crw = rw_remote_repo.config_writer() + section = "daemon" + try: + crw.add_section(section) + except Exception: + pass + crw.set(section, "receivepack", True) + # release lock + del(crw) + + # initialize the remote - first do it as local remote and pull, then + # we change the url to point to the daemon. The daemon should be started + # by the user, not by us + d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir) + d_remote.fetch() + remote_repo_url = "git://localhost%s" % remote_repo_dir + d_remote.config_writer.set('url', remote_repo_url) + + # try to list remotes to diagnoes whether the server is up + try: + rw_repo.git.ls_remote(d_remote) + except GitCommandError,e: + print str(e) + if os.name == 'nt': + raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"'%tempfile.gettempdir()) + else: + raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"'%tempfile.gettempdir()) + + # adjust working dir + prev_cwd = os.getcwd() + os.chdir(rw_repo.working_dir) + try: + return func(self, rw_repo, rw_remote_repo) + finally: + os.chdir(prev_cwd) + rw_repo.git.clear_cache() + rw_remote_repo.git.clear_cache() + shutil.rmtree(repo_dir, onerror=_rmtree_onerror) + shutil.rmtree(remote_repo_dir, onerror=_rmtree_onerror) + # END cleanup + # END bare repo creator + remote_repo_creator.__name__ = func.__name__ + return remote_repo_creator + # END remote repo creator + # END argument parsser + + return argument_passer + +#} END decorators + class TestBase(TestCase): - """ - Base Class providing default functionality to all tests such as: - - - Utility functions provided by the TestCase base of the unittest method such as:: - self.fail("todo") - self.failUnlessRaises(...) - - - Class level repository which is considered read-only as it is shared among - all test cases in your type. - Access it using:: - self.rorepo # 'ro' stands for read-only - - The rorepo is in fact your current project's git repo. If you refer to specific - shas for your objects, be sure you choose some that are part of the immutable portion - of the project history ( to assure tests don't fail for others ). - """ - - @classmethod - def setUpAll(cls): - """ - Dynamically add a read-only repository to our actual type. This way - each test type has its own repository - """ - cls.rorepo = Repo(GIT_REPO) - - def _make_file(self, rela_path, data, repo=None): - """ - Create a file at the given path relative to our repository, filled - with the given data. Returns absolute path to created file. - """ - repo = repo or self.rorepo - abs_path = os.path.join(repo.working_tree_dir, rela_path) - fp = open(abs_path, "w") - fp.write(data) - fp.close() - return abs_path + """ + Base Class providing default functionality to all tests such as: + + - Utility functions provided by the TestCase base of the unittest method such as:: + self.fail("todo") + self.failUnlessRaises(...) + + - Class level repository which is considered read-only as it is shared among + all test cases in your type. + Access it using:: + self.rorepo # 'ro' stands for read-only + + The rorepo is in fact your current project's git repo. If you refer to specific + shas for your objects, be sure you choose some that are part of the immutable portion + of the project history ( to assure tests don't fail for others ). + """ + + @classmethod + def setUpAll(cls): + """ + Dynamically add a read-only repository to our actual type. This way + each test type has its own repository + """ + cls.rorepo = Repo(GIT_REPO) + + def _make_file(self, rela_path, data, repo=None): + """ + Create a file at the given path relative to our repository, filled + with the given data. Returns absolute path to created file. + """ + repo = repo or self.rorepo + abs_path = os.path.join(repo.working_tree_dir, rela_path) + fp = open(abs_path, "w") + fp.write(data) + fp.close() + return abs_path |