diff options
author | Craig Northway <craig.northway@gmail.com> | 2014-07-25 11:26:48 +1000 |
---|---|---|
committer | Craig Northway <craig.northway@gmail.com> | 2014-07-25 19:19:22 +1000 |
commit | a23d0d8617ba3119069e610fc7b0850a17322726 (patch) | |
tree | d8bc5213dd7e7c7f0befdf65afecb13d5435f873 /git/test/test_stream.py | |
parent | 75194159abce545bfa38c3172efb42da9b0017dc (diff) | |
download | gitpython-a23d0d8617ba3119069e610fc7b0850a17322726.tar.gz |
Autopep8 fixes with maximum line length 120
Diffstat (limited to 'git/test/test_stream.py')
-rw-r--r-- | git/test/test_stream.py | 78 |
1 files changed, 38 insertions, 40 deletions
diff --git a/git/test/test_stream.py b/git/test/test_stream.py index 7af652b7..508038d7 100644 --- a/git/test/test_stream.py +++ b/git/test/test_stream.py @@ -4,24 +4,24 @@ # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Test for object db""" from lib import ( - TestBase, - DummyStream, - Sha1Writer, - make_bytes, - make_object, - fixture_path - ) + TestBase, + DummyStream, + Sha1Writer, + make_bytes, + make_object, + fixture_path +) from git.stream import * from git.util import ( NULL_HEX_SHA, hex_to_bin - ) +) from git.util import zlib from git.typ import ( str_blob_type - ) +) from git.db.py.loose import PureLooseObjectODB import time @@ -29,13 +29,12 @@ import tempfile import os - - class TestStream(TestBase): + """Test stream classes""" - - data_sizes = (15, 10000, 1000*1024+512) - + + data_sizes = (15, 10000, 1000 * 1024 + 512) + def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): """Make stream tests - the orig_stream is seekable, allowing it to be rewound and reused @@ -43,43 +42,43 @@ class TestStream(TestBase): :param rewind_stream: function called to rewind the stream to make it ready for reuse""" ns = 10 - assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata)) - + assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata)) + # read in small steps ss = len(cdata) / ns for i in range(ns): data = stream.read(ss) - chunk = cdata[i*ss:(i+1)*ss] + chunk = cdata[i * ss:(i + 1) * ss] assert data == chunk # END for each step rest = stream.read() if rest: assert rest == cdata[-len(rest):] # END handle rest - + if isinstance(stream, DecompressMemMapReader): assert len(stream.data()) == stream.compressed_bytes_read() # END handle special type - + rewind_stream(stream) - + # read everything rdata = stream.read() assert rdata == cdata - + if isinstance(stream, DecompressMemMapReader): assert len(stream.data()) == stream.compressed_bytes_read() # END handle special type - + def test_decompress_reader(self): for close_on_deletion in range(2): for with_size in range(2): for ds in self.data_sizes: cdata = make_bytes(ds, randomize=False) - + # zdata = zipped actual data # cdata = original content data - + # create reader if with_size: # need object data @@ -87,7 +86,7 @@ class TestStream(TestBase): type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) assert size == len(cdata) assert type == str_blob_type - + # even if we don't set the size, it will be set automatically on first read test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) assert test_reader._s == len(cdata) @@ -96,60 +95,59 @@ class TestStream(TestBase): zdata = zlib.compress(cdata) reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) assert reader._s == len(cdata) - # END get reader - + # END get reader + self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) - + # put in a dummy stream for closing dummy = DummyStream() reader._m = dummy - + assert not dummy.closed del(reader) assert dummy.closed == close_on_deletion # END for each datasize # END whether size should be used # END whether stream should be closed when deleted - + def test_sha_writer(self): writer = Sha1Writer() assert 2 == writer.write("hi") assert len(writer.sha(as_hex=1)) == 40 assert len(writer.sha(as_hex=0)) == 20 - + # make sure it does something ;) prev_sha = writer.sha() writer.write("hi again") assert writer.sha() != prev_sha - + def test_compressed_writer(self): for ds in self.data_sizes: fd, path = tempfile.mkstemp() ostream = FDCompressedSha1Writer(fd) data = make_bytes(ds, randomize=False) - + # for now, just a single write, code doesn't care about chunking assert len(data) == ostream.write(data) ostream.close() - + # its closed already self.failUnlessRaises(OSError, os.close, fd) - + # read everything back, compare to data we zip - fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0)) + fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) written_data = os.read(fd, os.path.getsize(path)) assert len(written_data) == os.path.getsize(path) os.close(fd) assert written_data == zlib.compress(data, 1) # best speed - + os.remove(path) # END for each os - + def test_decompress_reader_special_case(self): odb = PureLooseObjectODB(fixture_path('objects')) ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b')) - + # if there is a bug, we will be missing one byte exactly ! data = ostream.read() assert len(data) == ostream.size - |