summaryrefslogtreecommitdiff
path: root/test/git/test_odb.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-04 23:06:31 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-04 23:06:31 +0200
commita243827ab3346e188e99db2f9fc1f916941c9b1a (patch)
treee0111a983c83efb616dcbbe9aa453d5c54b3b90e /test/git/test_odb.py
parent6fbb69306c0e14bacb8dcb92a89af27d3d5d631f (diff)
downloadgitpython-a243827ab3346e188e99db2f9fc1f916941c9b1a.tar.gz
Implemented stream tests, found a bug on the way, slowly a test-framework for streams starts to show up, but its not yet there
Diffstat (limited to 'test/git/test_odb.py')
-rw-r--r--test/git/test_odb.py202
1 files changed, 200 insertions, 2 deletions
diff --git a/test/git/test_odb.py b/test/git/test_odb.py
index c3a03714..2f8ebd17 100644
--- a/test/git/test_odb.py
+++ b/test/git/test_odb.py
@@ -1,14 +1,210 @@
"""Test for object db"""
-
from test.testlib import *
from git.odb import *
+from git.odb.utils import (
+ to_hex_sha,
+ to_bin_sha
+ )
from git.odb.stream import Sha1Writer
from git import Blob
from git.errors import BadObject
-
from cStringIO import StringIO
+import tempfile
import os
+import zlib
+
+
+#{ Stream Utilities
+
+class DummyStream(object):
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
+
+class DeriveTest(OStream):
+ def __init__(self, sha, type, size, stream, *args, **kwargs):
+ self.myarg = kwargs.pop('myarg')
+ self.args = args
+
+ def _assert(self):
+ assert self.args
+ assert self.myarg
+
+#} END stream utilitiess
+
+
+class TestStream(TestBase):
+ """Test stream classes"""
+
+ data_sizes = (15, 10000, 1000*1024+512)
+
+ def test_streams(self):
+ # test info
+ sha = Blob.NULL_HEX_SHA
+ s = 20
+ info = OInfo(sha, Blob.type, s)
+ assert info.sha == sha
+ assert info.type == Blob.type
+ assert info.size == s
+
+ # test ostream
+ stream = DummyStream()
+ ostream = OStream(*(info + (stream, )))
+ ostream.read(15)
+ stream._assert()
+ assert stream.bytes == 15
+ ostream.read(20)
+ assert stream.bytes == 20
+
+ # defaults false
+ assert not ostream.is_compressed()
+
+ # derive with own args
+ DeriveTest(sha, Blob.type, s, stream, 'mine',myarg = 3)._assert()
+
+ # test istream
+ istream = IStream(Blob.type, s, stream)
+ assert not istream.is_compressed()
+ assert istream.sha == None
+ istream.sha = sha
+ assert istream.sha == sha
+
+ assert len(istream.binsha) == 20
+ assert len(istream.hexsha) == 40
+
+ assert istream.size == s
+ istream.size = s * 2
+ istream.size == s * 2
+ assert istream.type == Blob.type
+ istream.type = "something"
+ assert istream.type == "something"
+ assert istream.stream is stream
+ istream.stream = None
+ assert istream.stream is None
+
+ def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
+ """Make stream tests - the orig_stream is seekable, allowing it to be
+ rewound and reused
+ :param cdata: the data we expect to read from stream, the contents
+ :param rewind_stream: function called to rewind the stream to make it ready
+ for reuse"""
+ ns = 10
+ assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata))
+
+ # read in small steps
+ ss = len(cdata) / ns
+ for i in range(ns):
+ data = stream.read(ss)
+ chunk = cdata[i*ss:(i+1)*ss]
+ assert data == chunk
+ # END for each step
+ rest = stream.read()
+ if rest:
+ assert rest == cdata[-len(rest):]
+ # END handle rest
+
+ rewind_stream(stream)
+
+ # read everything
+ rdata = stream.read()
+ assert rdata == cdata
+
+ def test_decompress_reader(self):
+ for close_on_deletion in range(2):
+ for with_size in range(2):
+ for ds in self.data_sizes:
+ cdata = make_bytes(ds, randomize=False)
+
+ # zdata = zipped actual data
+ # cdata = original content data
+
+ # create reader
+ if with_size:
+ # need object data
+ zdata = zlib.compress(make_object(Blob.type, cdata))
+ type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
+ assert size == len(cdata)
+ assert type == Blob.type
+ else:
+ # here we need content data
+ zdata = zlib.compress(cdata)
+ reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
+ assert reader._s == len(cdata)
+ # END get reader
+
+ def rewind(r):
+ r._zip = zlib.decompressobj()
+ r._br = r._cws = r._cwe = 0
+ if with_size:
+ r._parse_header_info()
+ # END skip header
+ # END make rewind func
+
+ self._assert_stream_reader(reader, cdata, rewind)
+
+ # put in a dummy stream for closing
+ dummy = DummyStream()
+ reader._m = dummy
+
+ assert not dummy.closed
+ del(reader)
+ assert dummy.closed == close_on_deletion
+ #zdi#
+ # END for each datasize
+ # END whether size should be used
+ # END whether stream should be closed when deleted
+
+ def test_sha_writer(self):
+ writer = Sha1Writer()
+ assert 2 == writer.write("hi")
+ assert len(writer.sha(as_hex=1)) == 40
+ assert len(writer.sha(as_hex=0)) == 20
+
+ # make sure it does something ;)
+ prev_sha = writer.sha()
+ writer.write("hi again")
+ assert writer.sha() != prev_sha
+
+ def test_compressed_writer(self):
+ for ds in self.data_sizes:
+ fd, path = tempfile.mkstemp()
+ ostream = FDCompressedSha1Writer(fd)
+ data = make_bytes(ds, randomize=False)
+
+ # for now, just a single write, code doesn't care about chunking
+ assert len(data) == ostream.write(data)
+ ostream.close()
+ # its closed already
+ self.failUnlessRaises(OSError, os.close, fd)
+
+ # read everything back, compare to data we zip
+ fd = os.open(path, os.O_RDONLY)
+ written_data = os.read(fd, os.path.getsize(path))
+ os.close(fd)
+ assert written_data == zlib.compress(data, 1) # best speed
+
+ os.remove(path)
+ # END for each os
+
+
+class TestUtils(TestBase):
+ def test_basics(self):
+ assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA
+ assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20
+ assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA
+
class TestDB(TestBase):
"""Test the different db class implementations"""
@@ -35,6 +231,8 @@ class TestDB(TestBase):
assert type(prev_ostream) in ostreams or prev_ostream in ostreams
istream = IStream(Blob.type, len(data), StringIO(data))
+
+ # store returns same istream instance, with new sha set
my_istream = db.store(istream)
sha = istream.sha
assert my_istream is istream