diff options
Diffstat (limited to 'git/test/test_stream.py')
-rw-r--r-- | git/test/test_stream.py | 266 |
1 files changed, 133 insertions, 133 deletions
diff --git a/git/test/test_stream.py b/git/test/test_stream.py index 8d7a5f9a..7af652b7 100644 --- a/git/test/test_stream.py +++ b/git/test/test_stream.py @@ -4,24 +4,24 @@ # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Test for object db""" from lib import ( - TestBase, - DummyStream, - Sha1Writer, - make_bytes, - make_object, - fixture_path - ) + TestBase, + DummyStream, + Sha1Writer, + make_bytes, + make_object, + fixture_path + ) from git.stream import * from git.util import ( - NULL_HEX_SHA, - hex_to_bin - ) + NULL_HEX_SHA, + hex_to_bin + ) from git.util import zlib from git.typ import ( - str_blob_type - ) + str_blob_type + ) from git.db.py.loose import PureLooseObjectODB import time @@ -32,124 +32,124 @@ import os class TestStream(TestBase): - """Test stream classes""" - - data_sizes = (15, 10000, 1000*1024+512) - - def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): - """Make stream tests - the orig_stream is seekable, allowing it to be - rewound and reused - :param cdata: the data we expect to read from stream, the contents - :param rewind_stream: function called to rewind the stream to make it ready - for reuse""" - ns = 10 - assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata)) - - # read in small steps - ss = len(cdata) / ns - for i in range(ns): - data = stream.read(ss) - chunk = cdata[i*ss:(i+1)*ss] - assert data == chunk - # END for each step - rest = stream.read() - if rest: - assert rest == cdata[-len(rest):] - # END handle rest - - if isinstance(stream, DecompressMemMapReader): - assert len(stream.data()) == stream.compressed_bytes_read() - # END handle special type - - rewind_stream(stream) - - # read everything - rdata = stream.read() - assert rdata == cdata - - if isinstance(stream, DecompressMemMapReader): - assert len(stream.data()) == stream.compressed_bytes_read() - # END handle special type - - def test_decompress_reader(self): - for close_on_deletion in range(2): - for with_size in range(2): - for ds in self.data_sizes: - cdata = make_bytes(ds, randomize=False) - - # zdata = zipped actual data - # cdata = original content data - - # create reader - if with_size: - # need object data - zdata = zlib.compress(make_object(str_blob_type, cdata)) - type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) - assert size == len(cdata) - assert type == str_blob_type - - # even if we don't set the size, it will be set automatically on first read - test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) - assert test_reader._s == len(cdata) - else: - # here we need content data - zdata = zlib.compress(cdata) - reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) - assert reader._s == len(cdata) - # END get reader - - self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) - - # put in a dummy stream for closing - dummy = DummyStream() - reader._m = dummy - - assert not dummy.closed - del(reader) - assert dummy.closed == close_on_deletion - # END for each datasize - # END whether size should be used - # END whether stream should be closed when deleted - - def test_sha_writer(self): - writer = Sha1Writer() - assert 2 == writer.write("hi") - assert len(writer.sha(as_hex=1)) == 40 - assert len(writer.sha(as_hex=0)) == 20 - - # make sure it does something ;) - prev_sha = writer.sha() - writer.write("hi again") - assert writer.sha() != prev_sha - - def test_compressed_writer(self): - for ds in self.data_sizes: - fd, path = tempfile.mkstemp() - ostream = FDCompressedSha1Writer(fd) - data = make_bytes(ds, randomize=False) - - # for now, just a single write, code doesn't care about chunking - assert len(data) == ostream.write(data) - ostream.close() - - # its closed already - self.failUnlessRaises(OSError, os.close, fd) - - # read everything back, compare to data we zip - fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0)) - written_data = os.read(fd, os.path.getsize(path)) - assert len(written_data) == os.path.getsize(path) - os.close(fd) - assert written_data == zlib.compress(data, 1) # best speed - - os.remove(path) - # END for each os - - def test_decompress_reader_special_case(self): - odb = PureLooseObjectODB(fixture_path('objects')) - ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b')) - - # if there is a bug, we will be missing one byte exactly ! - data = ostream.read() - assert len(data) == ostream.size - + """Test stream classes""" + + data_sizes = (15, 10000, 1000*1024+512) + + def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): + """Make stream tests - the orig_stream is seekable, allowing it to be + rewound and reused + :param cdata: the data we expect to read from stream, the contents + :param rewind_stream: function called to rewind the stream to make it ready + for reuse""" + ns = 10 + assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata)) + + # read in small steps + ss = len(cdata) / ns + for i in range(ns): + data = stream.read(ss) + chunk = cdata[i*ss:(i+1)*ss] + assert data == chunk + # END for each step + rest = stream.read() + if rest: + assert rest == cdata[-len(rest):] + # END handle rest + + if isinstance(stream, DecompressMemMapReader): + assert len(stream.data()) == stream.compressed_bytes_read() + # END handle special type + + rewind_stream(stream) + + # read everything + rdata = stream.read() + assert rdata == cdata + + if isinstance(stream, DecompressMemMapReader): + assert len(stream.data()) == stream.compressed_bytes_read() + # END handle special type + + def test_decompress_reader(self): + for close_on_deletion in range(2): + for with_size in range(2): + for ds in self.data_sizes: + cdata = make_bytes(ds, randomize=False) + + # zdata = zipped actual data + # cdata = original content data + + # create reader + if with_size: + # need object data + zdata = zlib.compress(make_object(str_blob_type, cdata)) + type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) + assert size == len(cdata) + assert type == str_blob_type + + # even if we don't set the size, it will be set automatically on first read + test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) + assert test_reader._s == len(cdata) + else: + # here we need content data + zdata = zlib.compress(cdata) + reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) + assert reader._s == len(cdata) + # END get reader + + self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) + + # put in a dummy stream for closing + dummy = DummyStream() + reader._m = dummy + + assert not dummy.closed + del(reader) + assert dummy.closed == close_on_deletion + # END for each datasize + # END whether size should be used + # END whether stream should be closed when deleted + + def test_sha_writer(self): + writer = Sha1Writer() + assert 2 == writer.write("hi") + assert len(writer.sha(as_hex=1)) == 40 + assert len(writer.sha(as_hex=0)) == 20 + + # make sure it does something ;) + prev_sha = writer.sha() + writer.write("hi again") + assert writer.sha() != prev_sha + + def test_compressed_writer(self): + for ds in self.data_sizes: + fd, path = tempfile.mkstemp() + ostream = FDCompressedSha1Writer(fd) + data = make_bytes(ds, randomize=False) + + # for now, just a single write, code doesn't care about chunking + assert len(data) == ostream.write(data) + ostream.close() + + # its closed already + self.failUnlessRaises(OSError, os.close, fd) + + # read everything back, compare to data we zip + fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0)) + written_data = os.read(fd, os.path.getsize(path)) + assert len(written_data) == os.path.getsize(path) + os.close(fd) + assert written_data == zlib.compress(data, 1) # best speed + + os.remove(path) + # END for each os + + def test_decompress_reader_special_case(self): + odb = PureLooseObjectODB(fixture_path('objects')) + ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b')) + + # if there is a bug, we will be missing one byte exactly ! + data = ostream.read() + assert len(data) == ostream.size + |