summaryrefslogtreecommitdiff
path: root/test/git/odb/test_stream.py
blob: 020fe6bd35e7c1409b167c3db3e4e96fdd96b9be (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""Test for object db"""
from test.testlib import *
from lib import (
		DummyStream,
		DeriveTest, 
		Sha1Writer
	)

from git.odb import *
from git import Blob
from cStringIO import StringIO
import tempfile
import os
import zlib




class TestStream(TestBase):
	"""Test stream classes"""
	
	data_sizes = (15, 10000, 1000*1024+512)
	
	def test_streams(self):
		# test info
		sha = Blob.NULL_HEX_SHA
		s = 20
		info = OInfo(sha, Blob.type, s)
		assert info.sha == sha
		assert info.type == Blob.type
		assert info.size == s
		
		# test ostream
		stream = DummyStream()
		ostream = OStream(*(info + (stream, )))
		ostream.read(15)
		stream._assert()
		assert stream.bytes == 15
		ostream.read(20)
		assert stream.bytes == 20
		
		# derive with own args
		DeriveTest(sha, Blob.type, s, stream, 'mine',myarg = 3)._assert()
		
		# test istream
		istream = IStream(Blob.type, s, stream)
		assert istream.sha == None
		istream.sha = sha
		assert istream.sha == sha
		
		assert len(istream.binsha) == 20
		assert len(istream.hexsha) == 40
		
		assert istream.size == s
		istream.size = s * 2
		istream.size == s * 2
		assert istream.type == Blob.type
		istream.type = "something"
		assert istream.type == "something"
		assert istream.stream is stream
		istream.stream = None
		assert istream.stream is None
		
		assert istream.error is None
		istream.error = Exception()
		assert isinstance(istream.error, Exception)
		
	def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
		"""Make stream tests - the orig_stream is seekable, allowing it to be 
		rewound and reused
		:param cdata: the data we expect to read from stream, the contents
		:param rewind_stream: function called to rewind the stream to make it ready
			for reuse"""
		ns = 10
		assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata))
		
		# read in small steps
		ss = len(cdata) / ns
		for i in range(ns):
			data = stream.read(ss)
			chunk = cdata[i*ss:(i+1)*ss]
			assert data == chunk
		# END for each step
		rest = stream.read()
		if rest:
			assert rest == cdata[-len(rest):]
		# END handle rest
		
		rewind_stream(stream)
		
		# read everything
		rdata = stream.read()
		assert rdata == cdata
		
	def test_decompress_reader(self):
		for close_on_deletion in range(2):
			for with_size in range(2):
				for ds in self.data_sizes:
					cdata = make_bytes(ds, randomize=False)
					
					# zdata = zipped actual data
					# cdata = original content data
					
					# create reader
					if with_size:
						# need object data
						zdata = zlib.compress(make_object(Blob.type, cdata))
						type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
						assert size == len(cdata)
						assert type == Blob.type
					else:
						# here we need content data
						zdata = zlib.compress(cdata)
						reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
						assert reader._s == len(cdata)
					# END get reader 
					
					def rewind(r):
						r._zip = zlib.decompressobj()
						r._br = r._cws = r._cwe = 0
						if with_size:
							r._parse_header_info()
						# END skip header
					# END make rewind func
					
					self._assert_stream_reader(reader, cdata, rewind)
					
					# put in a dummy stream for closing
					dummy = DummyStream()
					reader._m = dummy
					
					assert not dummy.closed
					del(reader)
					assert dummy.closed == close_on_deletion
					#zdi#
				# END for each datasize
			# END whether size should be used
		# END whether stream should be closed when deleted
		
	def test_sha_writer(self):
		writer = Sha1Writer()
		assert 2 == writer.write("hi")
		assert len(writer.sha(as_hex=1)) == 40
		assert len(writer.sha(as_hex=0)) == 20
		
		# make sure it does something ;)
		prev_sha = writer.sha()
		writer.write("hi again")
		assert writer.sha() != prev_sha
		
	def test_compressed_writer(self):
		for ds in self.data_sizes:
			fd, path = tempfile.mkstemp()
			ostream = FDCompressedSha1Writer(fd)
			data = make_bytes(ds, randomize=False)
			
			# for now, just a single write, code doesn't care about chunking
			assert len(data) == ostream.write(data)
			ostream.close()
			# its closed already
			self.failUnlessRaises(OSError, os.close, fd)
			
			# read everything back, compare to data we zip
			fd = os.open(path, os.O_RDONLY)
			written_data = os.read(fd, os.path.getsize(path))
			os.close(fd)
			assert written_data == zlib.compress(data, 1)	# best speed
			
			os.remove(path)
		# END for each os