summaryrefslogtreecommitdiff
path: root/test/git/odb/test_stream.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-12 17:20:43 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-12 17:20:43 +0200
commit6c1faef799095f3990e9970bc2cb10aa0221cf9c (patch)
treea79a69fa46df39eb794875f374b74110dfe091e1 /test/git/odb/test_stream.py
parent86ea63504f3e8a74cfb1d533be9d9602d2d17e27 (diff)
downloadgitpython-6c1faef799095f3990e9970bc2cb10aa0221cf9c.tar.gz
Removed odb from project, it is now used as a submodule named gitdb, which was added instead
Adjusted all imports to deal with the changed package names
Diffstat (limited to 'test/git/odb/test_stream.py')
-rw-r--r--test/git/odb/test_stream.py172
1 files changed, 0 insertions, 172 deletions
diff --git a/test/git/odb/test_stream.py b/test/git/odb/test_stream.py
deleted file mode 100644
index 020fe6bd..00000000
--- a/test/git/odb/test_stream.py
+++ /dev/null
@@ -1,172 +0,0 @@
-"""Test for object db"""
-from test.testlib import *
-from lib import (
- DummyStream,
- DeriveTest,
- Sha1Writer
- )
-
-from git.odb import *
-from git import Blob
-from cStringIO import StringIO
-import tempfile
-import os
-import zlib
-
-
-
-
-class TestStream(TestBase):
- """Test stream classes"""
-
- data_sizes = (15, 10000, 1000*1024+512)
-
- def test_streams(self):
- # test info
- sha = Blob.NULL_HEX_SHA
- s = 20
- info = OInfo(sha, Blob.type, s)
- assert info.sha == sha
- assert info.type == Blob.type
- assert info.size == s
-
- # test ostream
- stream = DummyStream()
- ostream = OStream(*(info + (stream, )))
- ostream.read(15)
- stream._assert()
- assert stream.bytes == 15
- ostream.read(20)
- assert stream.bytes == 20
-
- # derive with own args
- DeriveTest(sha, Blob.type, s, stream, 'mine',myarg = 3)._assert()
-
- # test istream
- istream = IStream(Blob.type, s, stream)
- assert istream.sha == None
- istream.sha = sha
- assert istream.sha == sha
-
- assert len(istream.binsha) == 20
- assert len(istream.hexsha) == 40
-
- assert istream.size == s
- istream.size = s * 2
- istream.size == s * 2
- assert istream.type == Blob.type
- istream.type = "something"
- assert istream.type == "something"
- assert istream.stream is stream
- istream.stream = None
- assert istream.stream is None
-
- assert istream.error is None
- istream.error = Exception()
- assert isinstance(istream.error, Exception)
-
- def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
- """Make stream tests - the orig_stream is seekable, allowing it to be
- rewound and reused
- :param cdata: the data we expect to read from stream, the contents
- :param rewind_stream: function called to rewind the stream to make it ready
- for reuse"""
- ns = 10
- assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata))
-
- # read in small steps
- ss = len(cdata) / ns
- for i in range(ns):
- data = stream.read(ss)
- chunk = cdata[i*ss:(i+1)*ss]
- assert data == chunk
- # END for each step
- rest = stream.read()
- if rest:
- assert rest == cdata[-len(rest):]
- # END handle rest
-
- rewind_stream(stream)
-
- # read everything
- rdata = stream.read()
- assert rdata == cdata
-
- def test_decompress_reader(self):
- for close_on_deletion in range(2):
- for with_size in range(2):
- for ds in self.data_sizes:
- cdata = make_bytes(ds, randomize=False)
-
- # zdata = zipped actual data
- # cdata = original content data
-
- # create reader
- if with_size:
- # need object data
- zdata = zlib.compress(make_object(Blob.type, cdata))
- type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
- assert size == len(cdata)
- assert type == Blob.type
- else:
- # here we need content data
- zdata = zlib.compress(cdata)
- reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
- assert reader._s == len(cdata)
- # END get reader
-
- def rewind(r):
- r._zip = zlib.decompressobj()
- r._br = r._cws = r._cwe = 0
- if with_size:
- r._parse_header_info()
- # END skip header
- # END make rewind func
-
- self._assert_stream_reader(reader, cdata, rewind)
-
- # put in a dummy stream for closing
- dummy = DummyStream()
- reader._m = dummy
-
- assert not dummy.closed
- del(reader)
- assert dummy.closed == close_on_deletion
- #zdi#
- # END for each datasize
- # END whether size should be used
- # END whether stream should be closed when deleted
-
- def test_sha_writer(self):
- writer = Sha1Writer()
- assert 2 == writer.write("hi")
- assert len(writer.sha(as_hex=1)) == 40
- assert len(writer.sha(as_hex=0)) == 20
-
- # make sure it does something ;)
- prev_sha = writer.sha()
- writer.write("hi again")
- assert writer.sha() != prev_sha
-
- def test_compressed_writer(self):
- for ds in self.data_sizes:
- fd, path = tempfile.mkstemp()
- ostream = FDCompressedSha1Writer(fd)
- data = make_bytes(ds, randomize=False)
-
- # for now, just a single write, code doesn't care about chunking
- assert len(data) == ostream.write(data)
- ostream.close()
- # its closed already
- self.failUnlessRaises(OSError, os.close, fd)
-
- # read everything back, compare to data we zip
- fd = os.open(path, os.O_RDONLY)
- written_data = os.read(fd, os.path.getsize(path))
- os.close(fd)
- assert written_data == zlib.compress(data, 1) # best speed
-
- os.remove(path)
- # END for each os
-
-