summaryrefslogtreecommitdiff
path: root/lib/git/odb/stream.py
blob: d1181382376b69037709126b2eed7e3115d17ab9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
import zlib
from cStringIO import StringIO
from git.utils import make_sha
import errno

from utils import (
		to_hex_sha,
		to_bin_sha, 
		write, 
		close
	)

__all__ = ('OInfo', 'OStream', 'IStream', 'InvalidOInfo', 'InvalidOStream', 
			'DecompressMemMapReader', 'FDCompressedSha1Writer')


# ZLIB configuration
# used when compressing objects - 1 to 9 ( slowest )
Z_BEST_SPEED = 1


#{ ODB Bases

class OInfo(tuple):
	"""Carries information about an object in an ODB, provdiing information 
	about the sha of the object, the type_string as well as the uncompressed size
	in bytes.
	
	It can be accessed using tuple notation and using attribute access notation::
	
		assert dbi[0] == dbi.sha
		assert dbi[1] == dbi.type
		assert dbi[2] == dbi.size
	
	The type is designed to be as lighteight as possible."""
	__slots__ = tuple()
	
	def __new__(cls, sha, type, size):
		return tuple.__new__(cls, (sha, type, size))
	
	def __init__(self, *args):
		tuple.__init__(self)
	
	#{ Interface 
	@property
	def sha(self):
		return self[0]
		
	@property
	def type(self):
		return self[1]
		
	@property
	def size(self):
		return self[2]
	#} END interface


class OStream(OInfo):
	"""Base for object streams retrieved from the database, providing additional 
	information about the stream.
	Generally, ODB streams are read-only as objects are immutable"""
	__slots__ = tuple()
	
	def __new__(cls, sha, type, size, stream, *args, **kwargs):
		"""Helps with the initialization of subclasses"""
		return tuple.__new__(cls, (sha, type, size, stream))
	
	
	def __init__(self, *args, **kwargs):
		tuple.__init__(self)
	#{ Interface 
	
	def is_compressed(self):
		""":return: True if reads of this stream yield zlib compressed data. Default False
		:note: this does not imply anything about the actual internal storage.
			Hence the data could be uncompressed, but read compressed, or vice versa"""
		raise False
		
	#} END interface
	
	#{ Stream Reader Interface 
	
	def read(self, size=-1):
		return self[3].read(size)
		
	#} END stream reader interface


class IStream(list):
	"""Represents an input content stream to be fed into the ODB. It is mutable to allow 
	the ODB to record information about the operations outcome right in this instance.
	
	It provides interfaces for the OStream and a StreamReader to allow the instance
	to blend in without prior conversion.
	
	The only method your content stream must support is 'read'"""
	__slots__ = tuple()
	
	def __new__(cls, type, size, stream, sha=None, compressed=False):
		return list.__new__(cls, (sha, type, size, stream, compressed, None))
		
	def __init__(self, type, size, stream, sha=None, compressed=None):
		list.__init__(self, (sha, type, size, stream, compressed, None))
	
	#{ Interface 
	
	def hexsha(self):
		""":return: our sha, hex encoded, 40 bytes"""
		return to_hex_sha(self[0])
	
	def binsha(self):
		""":return: our sha as binary, 20 bytes"""
		return to_bin_sha(self[0])
		
	def _error(self):
		""":return: the error that occurred when processing the stream, or None"""
		return self[5]
		
	def _set_error(self, exc):
		"""Set this input stream to the given exc, may be None to reset the error"""
		self[5] = exc
			
	error = property(_error, _set_error)
	
	#} END interface
	
	#{ Stream Reader Interface
	
	def read(self, size=-1):
		"""Implements a simple stream reader interface, passing the read call on 
			to our internal stream"""
		return self[3].read(size)
		
	#} END stream reader interface 
	
	#{  interface
	
	def _set_sha(self, sha):
		self[0] = sha
		
	def _sha(self):
		return self[0]
		
	sha = property(_sha, _set_sha)
	
	
	def _type(self):
		return self[1]
	
	def _set_type(self, type):
		self[1] = type
		
	type = property(_type, _set_type)
	
	def _size(self):
		return self[2]
		
	def _set_size(self, size):
		self[2] = size
	
	size = property(_size, _set_size)
	
	def _stream(self):
		return self[3]
		
	def _set_stream(self, stream):
		self[3] = stream
	
	stream = property(_stream, _set_stream)
	
	#} END odb info interface 
	
	#{ OStream interface 
	
	def is_compressed(self):
		return self[4]
		
	#} END OStream interface
		

class InvalidOInfo(tuple):
	"""Carries information about a sha identifying an object which is invalid in 
	the queried database. The exception attribute provides more information about
	the cause of the issue"""
	__slots__ = tuple()
	
	def __new__(cls, sha, exc):
		return tuple.__new__(cls, (sha, exc))
		
	def __init__(self, sha, exc):
		tuple.__init__(self, (sha, exc))
	
	@property
	def sha(self):
		return self[0]
		
	@property
	def error(self):
		""":return: exception instance explaining the failure"""
		return self[1]


class InvalidOStream(InvalidOInfo):
	"""Carries information about an invalid ODB stream"""
	__slots__ = tuple()
	
#} END ODB Bases


#{ RO Streams

class DecompressMemMapReader(object):
	"""Reads data in chunks from a memory map and decompresses it. The client sees 
	only the uncompressed data, respective file-like read calls are handling on-demand
	buffered decompression accordingly
	
	A constraint on the total size of bytes is activated, simulating 
	a logical file within a possibly larger physical memory area
	
	To read efficiently, you clearly don't want to read individual bytes, instead, 
	read a few kilobytes at least.
	
	:note: The chunk-size should be carefully selected as it will involve quite a bit 
		of string copying due to the way the zlib is implemented. Its very wasteful, 
		hence we try to find a good tradeoff between allocation time and number of 
		times we actually allocate. An own zlib implementation would be good here
		to better support streamed reading - it would only need to keep the mmap
		and decompress it into chunks, thats all ... """
	__slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
	
	max_read_size = 512*1024
	
	def __init__(self, m, close_on_deletion, size):
		"""Initialize with mmap for stream reading"""
		self._m = m
		self._zip = zlib.decompressobj()
		self._buf = None						# buffer of decompressed bytes
		self._buflen = 0						# length of bytes in buffer
		self._s = size							# size of uncompressed data to read in total
		self._br = 0							# num uncompressed bytes read
		self._cws = 0							# start byte of compression window
		self._cwe = 0							# end byte of compression window
		self._close = close_on_deletion			# close the memmap on deletion ?
		
	def __del__(self):
		if self._close:
			self._m.close()
		# END handle resource freeing
		
	@classmethod
	def new(self, m, close_on_deletion=False):
		"""Create a new DecompressMemMapReader instance for acting as a read-only stream
		This method parses the object header from m and returns the parsed 
		type and size, as well as the created stream instance.
		:param m: memory map on which to oparate
		:param close_on_deletion: if True, the memory map will be closed once we are 
			being deleted"""
		inst = DecompressMemMapReader(m, close_on_deletion, 0)
		
		# read header
		maxb = 512				# should really be enough, cgit uses 8192 I believe
		inst._s = maxb
		hdr = inst.read(maxb)
		hdrend = hdr.find("\0")
		type, size = hdr[:hdrend].split(" ")
		size = int(size)
		inst._s = size
		
		# adjust internal state to match actual header length that we ignore
		# The buffer will be depleted first on future reads
		inst._br = 0
		hdrend += 1									# count terminating \0
		inst._buf = StringIO(hdr[hdrend:])
		inst._buflen = len(hdr) - hdrend
		
		return type, size, inst
		
	def read(self, size=-1):
		if size < 1:
			size = self._s - self._br
		else:
			size = min(size, self._s - self._br)
		# END clamp size
		
		if size == 0:
			return str()
		# END handle depletion
		
		# protect from memory peaks
		# If he tries to read large chunks, our memory patterns get really bad
		# as we end up copying a possibly huge chunk from our memory map right into
		# memory. This might not even be possible. Nonetheless, try to dampen the 
		# effect a bit by reading in chunks, returning a huge string in the end.
		# Our performance now depends on StringIO. This way we don't need two large
		# buffers in peak times, but only one large one in the end which is 
		# the return buffer
		# NO: We don't do it - if the user thinks its best, he is right. If he 
		# has trouble, he will start reading in chunks. According to our tests
		# its still faster if we read 10 Mb at once instead of chunking it.
		
		# if size > self.max_read_size:
			# sio = StringIO()
			# while size:
				# read_size = min(self.max_read_size, size)
				# data = self.read(read_size)
				# sio.write(data)
				# size -= len(data)
				# if len(data) < read_size:
					# break
			# # END data loop
			# sio.seek(0)
			# return sio.getvalue()
		# # END handle maxread
		# 
		# deplete the buffer, then just continue using the decompress object 
		# which has an own buffer. We just need this to transparently parse the 
		# header from the zlib stream
		dat = str()
		if self._buf:
			if self._buflen >= size:
				# have enough data
				dat = self._buf.read(size)
				self._buflen -= size
				self._br += size
				return dat
			else:
				dat = self._buf.read()		# ouch, duplicates data
				size -= self._buflen
				self._br += self._buflen
				
				self._buflen = 0
				self._buf = None
			# END handle buffer len
		# END handle buffer
		
		# decompress some data
		# Abstract: zlib needs to operate on chunks of our memory map ( which may 
		# be large ), as it will otherwise and always fill in the 'unconsumed_tail'
		# attribute which possible reads our whole map to the end, forcing 
		# everything to be read from disk even though just a portion was requested.
		# As this would be a nogo, we workaround it by passing only chunks of data, 
		# moving the window into the memory map along as we decompress, which keeps 
		# the tail smaller than our chunk-size. This causes 'only' the chunk to be
		# copied once, and another copy of a part of it when it creates the unconsumed
		# tail. We have to use it to hand in the appropriate amount of bytes durin g
		# the next read.
		tail = self._zip.unconsumed_tail
		if tail:
			# move the window, make it as large as size demands. For code-clarity, 
			# we just take the chunk from our map again instead of reusing the unconsumed
			# tail. The latter one would safe some memory copying, but we could end up
			# with not getting enough data uncompressed, so we had to sort that out as well.
			# Now we just assume the worst case, hence the data is uncompressed and the window
			# needs to be as large as the uncompressed bytes we want to read.
			self._cws = self._cwe - len(tail)
			self._cwe = self._cws + size
			
			
			indata = self._m[self._cws:self._cwe]		# another copy ... :(
			# get the actual window end to be sure we don't use it for computations
			self._cwe = self._cws + len(indata) 
		else:
			cws = self._cws
			self._cws = self._cwe
			self._cwe = cws + size 
			indata = self._m[self._cws:self._cwe]		# ... copy it again :(
		# END handle tail
			
		dcompdat = self._zip.decompress(indata, size)
		
		self._br += len(dcompdat)
		if dat:
			dcompdat = dat + dcompdat
			
		return dcompdat
		
#} END RO streams


#{ W Streams

class Sha1Writer(object):
	"""Simple stream writer which produces a sha whenever you like as it degests
	everything it is supposed to write"""
	
	def __init__(self):
		self.sha1 = make_sha("")

	#{ Stream Interface

	def write(self, data):
		""":raise IOError: If not all bytes could be written
		:return: lenght of incoming data"""
		self.sha1.update(data)
		return len(data)

	# END stream interface 

	#{ Interface
	
	def sha(self, as_hex = False):
		""":return: sha so far
		:param as_hex: if True, sha will be hex-encoded, binary otherwise"""
		if as_hex:
			return self.sha1.hexdigest()
		return self.sha1.digest()
	
	#} END interface 

class FDCompressedSha1Writer(Sha1Writer):
	"""Digests data written to it, making the sha available, then compress the 
	data and write it to the file descriptor
	:note: operates on raw file descriptors
	:note: for this to work, you have to use the close-method of this instance"""
	__slots__ = ("fd", "sha1", "zip")
	
	# default exception
	exc = IOError("Failed to write all bytes to filedescriptor")
	
	def __init__(self, fd):
		super(FDCompressedSha1Writer, self).__init__()
		self.fd = fd
		self.zip = zlib.compressobj(Z_BEST_SPEED)

	#{ Stream Interface

	def write(self, data):
		""":raise IOError: If not all bytes could be written
		:return: lenght of incoming data"""
		self.sha1.update(data)
		cdata = self.zip.compress(data)
		bytes_written = write(self.fd, cdata)
		if bytes_written != len(cdata):
			raise self.exc
		return len(data)

	def close(self):
		remainder = self.zip.flush()
		if write(self.fd, remainder) != len(remainder):
			raise self.exc
		return close(self.fd)

	#} END stream interface

#} END W streams