summaryrefslogtreecommitdiff
path: root/lib/git/odb/stream.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-04 17:30:31 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-04 17:30:31 +0200
commit6fbb69306c0e14bacb8dcb92a89af27d3d5d631f (patch)
tree4a6e0c14b412315c13cc4ac6466f4888644195a3 /lib/git/odb/stream.py
parent25dca42bac17d511b7e2ebdd9d1d679e7626db5f (diff)
parente746f96bcc29238b79118123028ca170adc4ff0f (diff)
downloadgitpython-6fbb69306c0e14bacb8dcb92a89af27d3d5d631f.tar.gz
Merge branch 'odb'
Conflicts: lib/git/cmd.py
Diffstat (limited to 'lib/git/odb/stream.py')
-rw-r--r--lib/git/odb/stream.py446
1 files changed, 446 insertions, 0 deletions
diff --git a/lib/git/odb/stream.py b/lib/git/odb/stream.py
new file mode 100644
index 00000000..d1181382
--- /dev/null
+++ b/lib/git/odb/stream.py
@@ -0,0 +1,446 @@
+import zlib
+from cStringIO import StringIO
+from git.utils import make_sha
+import errno
+
+from utils import (
+ to_hex_sha,
+ to_bin_sha,
+ write,
+ close
+ )
+
+__all__ = ('OInfo', 'OStream', 'IStream', 'InvalidOInfo', 'InvalidOStream',
+ 'DecompressMemMapReader', 'FDCompressedSha1Writer')
+
+
+# ZLIB configuration
+# used when compressing objects - 1 to 9 ( slowest )
+Z_BEST_SPEED = 1
+
+
+#{ ODB Bases
+
+class OInfo(tuple):
+ """Carries information about an object in an ODB, provdiing information
+ about the sha of the object, the type_string as well as the uncompressed size
+ in bytes.
+
+ It can be accessed using tuple notation and using attribute access notation::
+
+ assert dbi[0] == dbi.sha
+ assert dbi[1] == dbi.type
+ assert dbi[2] == dbi.size
+
+ The type is designed to be as lighteight as possible."""
+ __slots__ = tuple()
+
+ def __new__(cls, sha, type, size):
+ return tuple.__new__(cls, (sha, type, size))
+
+ def __init__(self, *args):
+ tuple.__init__(self)
+
+ #{ Interface
+ @property
+ def sha(self):
+ return self[0]
+
+ @property
+ def type(self):
+ return self[1]
+
+ @property
+ def size(self):
+ return self[2]
+ #} END interface
+
+
+class OStream(OInfo):
+ """Base for object streams retrieved from the database, providing additional
+ information about the stream.
+ Generally, ODB streams are read-only as objects are immutable"""
+ __slots__ = tuple()
+
+ def __new__(cls, sha, type, size, stream, *args, **kwargs):
+ """Helps with the initialization of subclasses"""
+ return tuple.__new__(cls, (sha, type, size, stream))
+
+
+ def __init__(self, *args, **kwargs):
+ tuple.__init__(self)
+ #{ Interface
+
+ def is_compressed(self):
+ """:return: True if reads of this stream yield zlib compressed data. Default False
+ :note: this does not imply anything about the actual internal storage.
+ Hence the data could be uncompressed, but read compressed, or vice versa"""
+ raise False
+
+ #} END interface
+
+ #{ Stream Reader Interface
+
+ def read(self, size=-1):
+ return self[3].read(size)
+
+ #} END stream reader interface
+
+
+class IStream(list):
+ """Represents an input content stream to be fed into the ODB. It is mutable to allow
+ the ODB to record information about the operations outcome right in this instance.
+
+ It provides interfaces for the OStream and a StreamReader to allow the instance
+ to blend in without prior conversion.
+
+ The only method your content stream must support is 'read'"""
+ __slots__ = tuple()
+
+ def __new__(cls, type, size, stream, sha=None, compressed=False):
+ return list.__new__(cls, (sha, type, size, stream, compressed, None))
+
+ def __init__(self, type, size, stream, sha=None, compressed=None):
+ list.__init__(self, (sha, type, size, stream, compressed, None))
+
+ #{ Interface
+
+ def hexsha(self):
+ """:return: our sha, hex encoded, 40 bytes"""
+ return to_hex_sha(self[0])
+
+ def binsha(self):
+ """:return: our sha as binary, 20 bytes"""
+ return to_bin_sha(self[0])
+
+ def _error(self):
+ """:return: the error that occurred when processing the stream, or None"""
+ return self[5]
+
+ def _set_error(self, exc):
+ """Set this input stream to the given exc, may be None to reset the error"""
+ self[5] = exc
+
+ error = property(_error, _set_error)
+
+ #} END interface
+
+ #{ Stream Reader Interface
+
+ def read(self, size=-1):
+ """Implements a simple stream reader interface, passing the read call on
+ to our internal stream"""
+ return self[3].read(size)
+
+ #} END stream reader interface
+
+ #{ interface
+
+ def _set_sha(self, sha):
+ self[0] = sha
+
+ def _sha(self):
+ return self[0]
+
+ sha = property(_sha, _set_sha)
+
+
+ def _type(self):
+ return self[1]
+
+ def _set_type(self, type):
+ self[1] = type
+
+ type = property(_type, _set_type)
+
+ def _size(self):
+ return self[2]
+
+ def _set_size(self, size):
+ self[2] = size
+
+ size = property(_size, _set_size)
+
+ def _stream(self):
+ return self[3]
+
+ def _set_stream(self, stream):
+ self[3] = stream
+
+ stream = property(_stream, _set_stream)
+
+ #} END odb info interface
+
+ #{ OStream interface
+
+ def is_compressed(self):
+ return self[4]
+
+ #} END OStream interface
+
+
+class InvalidOInfo(tuple):
+ """Carries information about a sha identifying an object which is invalid in
+ the queried database. The exception attribute provides more information about
+ the cause of the issue"""
+ __slots__ = tuple()
+
+ def __new__(cls, sha, exc):
+ return tuple.__new__(cls, (sha, exc))
+
+ def __init__(self, sha, exc):
+ tuple.__init__(self, (sha, exc))
+
+ @property
+ def sha(self):
+ return self[0]
+
+ @property
+ def error(self):
+ """:return: exception instance explaining the failure"""
+ return self[1]
+
+
+class InvalidOStream(InvalidOInfo):
+ """Carries information about an invalid ODB stream"""
+ __slots__ = tuple()
+
+#} END ODB Bases
+
+
+#{ RO Streams
+
+class DecompressMemMapReader(object):
+ """Reads data in chunks from a memory map and decompresses it. The client sees
+ only the uncompressed data, respective file-like read calls are handling on-demand
+ buffered decompression accordingly
+
+ A constraint on the total size of bytes is activated, simulating
+ a logical file within a possibly larger physical memory area
+
+ To read efficiently, you clearly don't want to read individual bytes, instead,
+ read a few kilobytes at least.
+
+ :note: The chunk-size should be carefully selected as it will involve quite a bit
+ of string copying due to the way the zlib is implemented. Its very wasteful,
+ hence we try to find a good tradeoff between allocation time and number of
+ times we actually allocate. An own zlib implementation would be good here
+ to better support streamed reading - it would only need to keep the mmap
+ and decompress it into chunks, thats all ... """
+ __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
+
+ max_read_size = 512*1024
+
+ def __init__(self, m, close_on_deletion, size):
+ """Initialize with mmap for stream reading"""
+ self._m = m
+ self._zip = zlib.decompressobj()
+ self._buf = None # buffer of decompressed bytes
+ self._buflen = 0 # length of bytes in buffer
+ self._s = size # size of uncompressed data to read in total
+ self._br = 0 # num uncompressed bytes read
+ self._cws = 0 # start byte of compression window
+ self._cwe = 0 # end byte of compression window
+ self._close = close_on_deletion # close the memmap on deletion ?
+
+ def __del__(self):
+ if self._close:
+ self._m.close()
+ # END handle resource freeing
+
+ @classmethod
+ def new(self, m, close_on_deletion=False):
+ """Create a new DecompressMemMapReader instance for acting as a read-only stream
+ This method parses the object header from m and returns the parsed
+ type and size, as well as the created stream instance.
+ :param m: memory map on which to oparate
+ :param close_on_deletion: if True, the memory map will be closed once we are
+ being deleted"""
+ inst = DecompressMemMapReader(m, close_on_deletion, 0)
+
+ # read header
+ maxb = 512 # should really be enough, cgit uses 8192 I believe
+ inst._s = maxb
+ hdr = inst.read(maxb)
+ hdrend = hdr.find("\0")
+ type, size = hdr[:hdrend].split(" ")
+ size = int(size)
+ inst._s = size
+
+ # adjust internal state to match actual header length that we ignore
+ # The buffer will be depleted first on future reads
+ inst._br = 0
+ hdrend += 1 # count terminating \0
+ inst._buf = StringIO(hdr[hdrend:])
+ inst._buflen = len(hdr) - hdrend
+
+ return type, size, inst
+
+ def read(self, size=-1):
+ if size < 1:
+ size = self._s - self._br
+ else:
+ size = min(size, self._s - self._br)
+ # END clamp size
+
+ if size == 0:
+ return str()
+ # END handle depletion
+
+ # protect from memory peaks
+ # If he tries to read large chunks, our memory patterns get really bad
+ # as we end up copying a possibly huge chunk from our memory map right into
+ # memory. This might not even be possible. Nonetheless, try to dampen the
+ # effect a bit by reading in chunks, returning a huge string in the end.
+ # Our performance now depends on StringIO. This way we don't need two large
+ # buffers in peak times, but only one large one in the end which is
+ # the return buffer
+ # NO: We don't do it - if the user thinks its best, he is right. If he
+ # has trouble, he will start reading in chunks. According to our tests
+ # its still faster if we read 10 Mb at once instead of chunking it.
+
+ # if size > self.max_read_size:
+ # sio = StringIO()
+ # while size:
+ # read_size = min(self.max_read_size, size)
+ # data = self.read(read_size)
+ # sio.write(data)
+ # size -= len(data)
+ # if len(data) < read_size:
+ # break
+ # # END data loop
+ # sio.seek(0)
+ # return sio.getvalue()
+ # # END handle maxread
+ #
+ # deplete the buffer, then just continue using the decompress object
+ # which has an own buffer. We just need this to transparently parse the
+ # header from the zlib stream
+ dat = str()
+ if self._buf:
+ if self._buflen >= size:
+ # have enough data
+ dat = self._buf.read(size)
+ self._buflen -= size
+ self._br += size
+ return dat
+ else:
+ dat = self._buf.read() # ouch, duplicates data
+ size -= self._buflen
+ self._br += self._buflen
+
+ self._buflen = 0
+ self._buf = None
+ # END handle buffer len
+ # END handle buffer
+
+ # decompress some data
+ # Abstract: zlib needs to operate on chunks of our memory map ( which may
+ # be large ), as it will otherwise and always fill in the 'unconsumed_tail'
+ # attribute which possible reads our whole map to the end, forcing
+ # everything to be read from disk even though just a portion was requested.
+ # As this would be a nogo, we workaround it by passing only chunks of data,
+ # moving the window into the memory map along as we decompress, which keeps
+ # the tail smaller than our chunk-size. This causes 'only' the chunk to be
+ # copied once, and another copy of a part of it when it creates the unconsumed
+ # tail. We have to use it to hand in the appropriate amount of bytes durin g
+ # the next read.
+ tail = self._zip.unconsumed_tail
+ if tail:
+ # move the window, make it as large as size demands. For code-clarity,
+ # we just take the chunk from our map again instead of reusing the unconsumed
+ # tail. The latter one would safe some memory copying, but we could end up
+ # with not getting enough data uncompressed, so we had to sort that out as well.
+ # Now we just assume the worst case, hence the data is uncompressed and the window
+ # needs to be as large as the uncompressed bytes we want to read.
+ self._cws = self._cwe - len(tail)
+ self._cwe = self._cws + size
+
+
+ indata = self._m[self._cws:self._cwe] # another copy ... :(
+ # get the actual window end to be sure we don't use it for computations
+ self._cwe = self._cws + len(indata)
+ else:
+ cws = self._cws
+ self._cws = self._cwe
+ self._cwe = cws + size
+ indata = self._m[self._cws:self._cwe] # ... copy it again :(
+ # END handle tail
+
+ dcompdat = self._zip.decompress(indata, size)
+
+ self._br += len(dcompdat)
+ if dat:
+ dcompdat = dat + dcompdat
+
+ return dcompdat
+
+#} END RO streams
+
+
+#{ W Streams
+
+class Sha1Writer(object):
+ """Simple stream writer which produces a sha whenever you like as it degests
+ everything it is supposed to write"""
+
+ def __init__(self):
+ self.sha1 = make_sha("")
+
+ #{ Stream Interface
+
+ def write(self, data):
+ """:raise IOError: If not all bytes could be written
+ :return: lenght of incoming data"""
+ self.sha1.update(data)
+ return len(data)
+
+ # END stream interface
+
+ #{ Interface
+
+ def sha(self, as_hex = False):
+ """:return: sha so far
+ :param as_hex: if True, sha will be hex-encoded, binary otherwise"""
+ if as_hex:
+ return self.sha1.hexdigest()
+ return self.sha1.digest()
+
+ #} END interface
+
+class FDCompressedSha1Writer(Sha1Writer):
+ """Digests data written to it, making the sha available, then compress the
+ data and write it to the file descriptor
+ :note: operates on raw file descriptors
+ :note: for this to work, you have to use the close-method of this instance"""
+ __slots__ = ("fd", "sha1", "zip")
+
+ # default exception
+ exc = IOError("Failed to write all bytes to filedescriptor")
+
+ def __init__(self, fd):
+ super(FDCompressedSha1Writer, self).__init__()
+ self.fd = fd
+ self.zip = zlib.compressobj(Z_BEST_SPEED)
+
+ #{ Stream Interface
+
+ def write(self, data):
+ """:raise IOError: If not all bytes could be written
+ :return: lenght of incoming data"""
+ self.sha1.update(data)
+ cdata = self.zip.compress(data)
+ bytes_written = write(self.fd, cdata)
+ if bytes_written != len(cdata):
+ raise self.exc
+ return len(data)
+
+ def close(self):
+ remainder = self.zip.flush()
+ if write(self.fd, remainder) != len(remainder):
+ raise self.exc
+ return close(self.fd)
+
+ #} END stream interface
+
+#} END W streams