summaryrefslogtreecommitdiff
path: root/git/test/test_stream.py
diff options
context:
space:
mode:
authorCraig Northway <craig.northway@gmail.com>2014-07-25 11:26:48 +1000
committerCraig Northway <craig.northway@gmail.com>2014-07-25 19:19:22 +1000
commita23d0d8617ba3119069e610fc7b0850a17322726 (patch)
treed8bc5213dd7e7c7f0befdf65afecb13d5435f873 /git/test/test_stream.py
parent75194159abce545bfa38c3172efb42da9b0017dc (diff)
downloadgitpython-a23d0d8617ba3119069e610fc7b0850a17322726.tar.gz
Autopep8 fixes with maximum line length 120
Diffstat (limited to 'git/test/test_stream.py')
-rw-r--r--git/test/test_stream.py78
1 files changed, 38 insertions, 40 deletions
diff --git a/git/test/test_stream.py b/git/test/test_stream.py
index 7af652b7..508038d7 100644
--- a/git/test/test_stream.py
+++ b/git/test/test_stream.py
@@ -4,24 +4,24 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
from lib import (
- TestBase,
- DummyStream,
- Sha1Writer,
- make_bytes,
- make_object,
- fixture_path
- )
+ TestBase,
+ DummyStream,
+ Sha1Writer,
+ make_bytes,
+ make_object,
+ fixture_path
+)
from git.stream import *
from git.util import (
NULL_HEX_SHA,
hex_to_bin
- )
+)
from git.util import zlib
from git.typ import (
str_blob_type
- )
+)
from git.db.py.loose import PureLooseObjectODB
import time
@@ -29,13 +29,12 @@ import tempfile
import os
-
-
class TestStream(TestBase):
+
"""Test stream classes"""
-
- data_sizes = (15, 10000, 1000*1024+512)
-
+
+ data_sizes = (15, 10000, 1000 * 1024 + 512)
+
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
"""Make stream tests - the orig_stream is seekable, allowing it to be
rewound and reused
@@ -43,43 +42,43 @@ class TestStream(TestBase):
:param rewind_stream: function called to rewind the stream to make it ready
for reuse"""
ns = 10
- assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata))
-
+ assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
+
# read in small steps
ss = len(cdata) / ns
for i in range(ns):
data = stream.read(ss)
- chunk = cdata[i*ss:(i+1)*ss]
+ chunk = cdata[i * ss:(i + 1) * ss]
assert data == chunk
# END for each step
rest = stream.read()
if rest:
assert rest == cdata[-len(rest):]
# END handle rest
-
+
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
-
+
rewind_stream(stream)
-
+
# read everything
rdata = stream.read()
assert rdata == cdata
-
+
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
-
+
def test_decompress_reader(self):
for close_on_deletion in range(2):
for with_size in range(2):
for ds in self.data_sizes:
cdata = make_bytes(ds, randomize=False)
-
+
# zdata = zipped actual data
# cdata = original content data
-
+
# create reader
if with_size:
# need object data
@@ -87,7 +86,7 @@ class TestStream(TestBase):
type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
assert size == len(cdata)
assert type == str_blob_type
-
+
# even if we don't set the size, it will be set automatically on first read
test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
assert test_reader._s == len(cdata)
@@ -96,60 +95,59 @@ class TestStream(TestBase):
zdata = zlib.compress(cdata)
reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
assert reader._s == len(cdata)
- # END get reader
-
+ # END get reader
+
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
-
+
# put in a dummy stream for closing
dummy = DummyStream()
reader._m = dummy
-
+
assert not dummy.closed
del(reader)
assert dummy.closed == close_on_deletion
# END for each datasize
# END whether size should be used
# END whether stream should be closed when deleted
-
+
def test_sha_writer(self):
writer = Sha1Writer()
assert 2 == writer.write("hi")
assert len(writer.sha(as_hex=1)) == 40
assert len(writer.sha(as_hex=0)) == 20
-
+
# make sure it does something ;)
prev_sha = writer.sha()
writer.write("hi again")
assert writer.sha() != prev_sha
-
+
def test_compressed_writer(self):
for ds in self.data_sizes:
fd, path = tempfile.mkstemp()
ostream = FDCompressedSha1Writer(fd)
data = make_bytes(ds, randomize=False)
-
+
# for now, just a single write, code doesn't care about chunking
assert len(data) == ostream.write(data)
ostream.close()
-
+
# its closed already
self.failUnlessRaises(OSError, os.close, fd)
-
+
# read everything back, compare to data we zip
- fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0))
+ fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
written_data = os.read(fd, os.path.getsize(path))
assert len(written_data) == os.path.getsize(path)
os.close(fd)
assert written_data == zlib.compress(data, 1) # best speed
-
+
os.remove(path)
# END for each os
-
+
def test_decompress_reader_special_case(self):
odb = PureLooseObjectODB(fixture_path('objects'))
ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b'))
-
+
# if there is a bug, we will be missing one byte exactly !
data = ostream.read()
assert len(data) == ostream.size
-