diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-04 23:06:31 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-04 23:06:31 +0200 |
commit | a243827ab3346e188e99db2f9fc1f916941c9b1a (patch) | |
tree | e0111a983c83efb616dcbbe9aa453d5c54b3b90e /test/git/test_odb.py | |
parent | 6fbb69306c0e14bacb8dcb92a89af27d3d5d631f (diff) | |
download | gitpython-a243827ab3346e188e99db2f9fc1f916941c9b1a.tar.gz |
Implemented stream tests, found a bug on the way, slowly a test-framework for streams starts to show up, but its not yet there
Diffstat (limited to 'test/git/test_odb.py')
-rw-r--r-- | test/git/test_odb.py | 202 |
1 files changed, 200 insertions, 2 deletions
diff --git a/test/git/test_odb.py b/test/git/test_odb.py index c3a03714..2f8ebd17 100644 --- a/test/git/test_odb.py +++ b/test/git/test_odb.py @@ -1,14 +1,210 @@ """Test for object db""" - from test.testlib import * from git.odb import * +from git.odb.utils import ( + to_hex_sha, + to_bin_sha + ) from git.odb.stream import Sha1Writer from git import Blob from git.errors import BadObject - from cStringIO import StringIO +import tempfile import os +import zlib + + +#{ Stream Utilities + +class DummyStream(object): + def __init__(self): + self.was_read = False + self.bytes = 0 + self.closed = False + + def read(self, size): + self.was_read = True + self.bytes = size + + def close(self): + self.closed = True + + def _assert(self): + assert self.was_read + +class DeriveTest(OStream): + def __init__(self, sha, type, size, stream, *args, **kwargs): + self.myarg = kwargs.pop('myarg') + self.args = args + + def _assert(self): + assert self.args + assert self.myarg + +#} END stream utilitiess + + +class TestStream(TestBase): + """Test stream classes""" + + data_sizes = (15, 10000, 1000*1024+512) + + def test_streams(self): + # test info + sha = Blob.NULL_HEX_SHA + s = 20 + info = OInfo(sha, Blob.type, s) + assert info.sha == sha + assert info.type == Blob.type + assert info.size == s + + # test ostream + stream = DummyStream() + ostream = OStream(*(info + (stream, ))) + ostream.read(15) + stream._assert() + assert stream.bytes == 15 + ostream.read(20) + assert stream.bytes == 20 + + # defaults false + assert not ostream.is_compressed() + + # derive with own args + DeriveTest(sha, Blob.type, s, stream, 'mine',myarg = 3)._assert() + + # test istream + istream = IStream(Blob.type, s, stream) + assert not istream.is_compressed() + assert istream.sha == None + istream.sha = sha + assert istream.sha == sha + + assert len(istream.binsha) == 20 + assert len(istream.hexsha) == 40 + + assert istream.size == s + istream.size = s * 2 + istream.size == s * 2 + assert istream.type == Blob.type + istream.type = "something" + assert istream.type == "something" + assert istream.stream is stream + istream.stream = None + assert istream.stream is None + + def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): + """Make stream tests - the orig_stream is seekable, allowing it to be + rewound and reused + :param cdata: the data we expect to read from stream, the contents + :param rewind_stream: function called to rewind the stream to make it ready + for reuse""" + ns = 10 + assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata)) + + # read in small steps + ss = len(cdata) / ns + for i in range(ns): + data = stream.read(ss) + chunk = cdata[i*ss:(i+1)*ss] + assert data == chunk + # END for each step + rest = stream.read() + if rest: + assert rest == cdata[-len(rest):] + # END handle rest + + rewind_stream(stream) + + # read everything + rdata = stream.read() + assert rdata == cdata + + def test_decompress_reader(self): + for close_on_deletion in range(2): + for with_size in range(2): + for ds in self.data_sizes: + cdata = make_bytes(ds, randomize=False) + + # zdata = zipped actual data + # cdata = original content data + + # create reader + if with_size: + # need object data + zdata = zlib.compress(make_object(Blob.type, cdata)) + type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) + assert size == len(cdata) + assert type == Blob.type + else: + # here we need content data + zdata = zlib.compress(cdata) + reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) + assert reader._s == len(cdata) + # END get reader + + def rewind(r): + r._zip = zlib.decompressobj() + r._br = r._cws = r._cwe = 0 + if with_size: + r._parse_header_info() + # END skip header + # END make rewind func + + self._assert_stream_reader(reader, cdata, rewind) + + # put in a dummy stream for closing + dummy = DummyStream() + reader._m = dummy + + assert not dummy.closed + del(reader) + assert dummy.closed == close_on_deletion + #zdi# + # END for each datasize + # END whether size should be used + # END whether stream should be closed when deleted + + def test_sha_writer(self): + writer = Sha1Writer() + assert 2 == writer.write("hi") + assert len(writer.sha(as_hex=1)) == 40 + assert len(writer.sha(as_hex=0)) == 20 + + # make sure it does something ;) + prev_sha = writer.sha() + writer.write("hi again") + assert writer.sha() != prev_sha + + def test_compressed_writer(self): + for ds in self.data_sizes: + fd, path = tempfile.mkstemp() + ostream = FDCompressedSha1Writer(fd) + data = make_bytes(ds, randomize=False) + + # for now, just a single write, code doesn't care about chunking + assert len(data) == ostream.write(data) + ostream.close() + # its closed already + self.failUnlessRaises(OSError, os.close, fd) + + # read everything back, compare to data we zip + fd = os.open(path, os.O_RDONLY) + written_data = os.read(fd, os.path.getsize(path)) + os.close(fd) + assert written_data == zlib.compress(data, 1) # best speed + + os.remove(path) + # END for each os + + +class TestUtils(TestBase): + def test_basics(self): + assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA + assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20 + assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA + class TestDB(TestBase): """Test the different db class implementations""" @@ -35,6 +231,8 @@ class TestDB(TestBase): assert type(prev_ostream) in ostreams or prev_ostream in ostreams istream = IStream(Blob.type, len(data), StringIO(data)) + + # store returns same istream instance, with new sha set my_istream = db.store(istream) sha = istream.sha assert my_istream is istream |