summaryrefslogtreecommitdiff
path: root/lib/git/odb/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/odb/stream.py')
-rw-r--r--lib/git/odb/stream.py168
1 files changed, 113 insertions, 55 deletions
diff --git a/lib/git/odb/stream.py b/lib/git/odb/stream.py
index 325c1444..d1181382 100644
--- a/lib/git/odb/stream.py
+++ b/lib/git/odb/stream.py
@@ -5,10 +5,13 @@ import errno
from utils import (
to_hex_sha,
- to_bin_sha
+ to_bin_sha,
+ write,
+ close
)
-__all__ = ('FDCompressedSha1Writer', 'DecompressMemMapReader')
+__all__ = ('OInfo', 'OStream', 'IStream', 'InvalidOInfo', 'InvalidOStream',
+ 'DecompressMemMapReader', 'FDCompressedSha1Writer')
# ZLIB configuration
@@ -18,7 +21,7 @@ Z_BEST_SPEED = 1
#{ ODB Bases
-class ODB_Info(tuple):
+class OInfo(tuple):
"""Carries information about an object in an ODB, provdiing information
about the sha of the object, the type_string as well as the uncompressed size
in bytes.
@@ -35,8 +38,8 @@ class ODB_Info(tuple):
def __new__(cls, sha, type, size):
return tuple.__new__(cls, (sha, type, size))
- def __init__(self, sha, type, size):
- pass
+ def __init__(self, *args):
+ tuple.__init__(self)
#{ Interface
@property
@@ -53,38 +56,52 @@ class ODB_Info(tuple):
#} END interface
-class ODB_OStream(ODB_Info):
+class OStream(OInfo):
"""Base for object streams retrieved from the database, providing additional
information about the stream.
Generally, ODB streams are read-only as objects are immutable"""
__slots__ = tuple()
- def __new__(cls, sha, type, size, *args, **kwargs):
+ def __new__(cls, sha, type, size, stream, *args, **kwargs):
"""Helps with the initialization of subclasses"""
- return tuple.__new__(cls, (sha, type, size))
+ return tuple.__new__(cls, (sha, type, size, stream))
+
+
+ def __init__(self, *args, **kwargs):
+ tuple.__init__(self)
+ #{ Interface
def is_compressed(self):
- """:return: True if reads of this stream yield zlib compressed data.
+ """:return: True if reads of this stream yield zlib compressed data. Default False
:note: this does not imply anything about the actual internal storage.
Hence the data could be uncompressed, but read compressed, or vice versa"""
- raise NotImplementedError("To be implemented by subclass")
+ raise False
+
+ #} END interface
+
+ #{ Stream Reader Interface
+
+ def read(self, size=-1):
+ return self[3].read(size)
+
+ #} END stream reader interface
-class ODB_IStream(list):
+class IStream(list):
"""Represents an input content stream to be fed into the ODB. It is mutable to allow
the ODB to record information about the operations outcome right in this instance.
- It provides interfaces for the ODB_OStream and a StreamReader to allow the instance
+ It provides interfaces for the OStream and a StreamReader to allow the instance
to blend in without prior conversion.
The only method your content stream must support is 'read'"""
__slots__ = tuple()
def __new__(cls, type, size, stream, sha=None, compressed=False):
- list.__new__(cls, (sha, type, size, stream, compressed, None))
+ return list.__new__(cls, (sha, type, size, stream, compressed, None))
- def __init__(cls, type, size, stream, sha=None, compressed=None):
- pass
+ def __init__(self, type, size, stream, sha=None, compressed=None):
+ list.__init__(self, (sha, type, size, stream, compressed, None))
#{ Interface
@@ -127,25 +144,42 @@ class ODB_IStream(list):
sha = property(_sha, _set_sha)
- @property
- def type(self):
+
+ def _type(self):
return self[1]
+
+ def _set_type(self, type):
+ self[1] = type
- @property
- def size(self):
+ type = property(_type, _set_type)
+
+ def _size(self):
return self[2]
+
+ def _set_size(self, size):
+ self[2] = size
+
+ size = property(_size, _set_size)
+
+ def _stream(self):
+ return self[3]
+
+ def _set_stream(self, stream):
+ self[3] = stream
+
+ stream = property(_stream, _set_stream)
#} END odb info interface
- #{ ODB_OStream interface
+ #{ OStream interface
def is_compressed(self):
return self[4]
- #} END ODB_OStream interface
+ #} END OStream interface
-class InvalidODB_Info(tuple):
+class InvalidOInfo(tuple):
"""Carries information about a sha identifying an object which is invalid in
the queried database. The exception attribute provides more information about
the cause of the issue"""
@@ -155,7 +189,7 @@ class InvalidODB_Info(tuple):
return tuple.__new__(cls, (sha, exc))
def __init__(self, sha, exc):
- pass
+ tuple.__init__(self, (sha, exc))
@property
def sha(self):
@@ -166,7 +200,8 @@ class InvalidODB_Info(tuple):
""":return: exception instance explaining the failure"""
return self[1]
-class InvalidODB_OStream(InvalidODB_Info):
+
+class InvalidOStream(InvalidOInfo):
"""Carries information about an invalid ODB stream"""
__slots__ = tuple()
@@ -175,7 +210,7 @@ class InvalidODB_OStream(InvalidODB_Info):
#{ RO Streams
-class DecompressMemMapReader(ODB_OStream):
+class DecompressMemMapReader(object):
"""Reads data in chunks from a memory map and decompresses it. The client sees
only the uncompressed data, respective file-like read calls are handling on-demand
buffered decompression accordingly
@@ -192,17 +227,17 @@ class DecompressMemMapReader(ODB_OStream):
times we actually allocate. An own zlib implementation would be good here
to better support streamed reading - it would only need to keep the mmap
and decompress it into chunks, thats all ... """
- # __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
+ __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
max_read_size = 512*1024
- def __init__(self, sha, type, size, m, close_on_deletion):
+ def __init__(self, m, close_on_deletion, size):
"""Initialize with mmap for stream reading"""
self._m = m
self._zip = zlib.decompressobj()
self._buf = None # buffer of decompressed bytes
self._buflen = 0 # length of bytes in buffer
- self._s = 0 # size of uncompressed data to read in total
+ self._s = size # size of uncompressed data to read in total
self._br = 0 # num uncompressed bytes read
self._cws = 0 # start byte of compression window
self._cwe = 0 # end byte of compression window
@@ -213,34 +248,33 @@ class DecompressMemMapReader(ODB_OStream):
self._m.close()
# END handle resource freeing
- def initialize(self, size=0):
- """Initialize this instance for acting as a read-only stream for size bytes.
- :param size: size in bytes to be decompresed before being depleted.
- If 0, default object header information is parsed from the data,
- returning a tuple of (type_string, uncompressed_size)
- If not 0, the size will be used, and None is returned.
- :note: must only be called exactly once"""
- if size:
- self._s = size
- return
- # END handle size
+ @classmethod
+ def new(self, m, close_on_deletion=False):
+ """Create a new DecompressMemMapReader instance for acting as a read-only stream
+ This method parses the object header from m and returns the parsed
+ type and size, as well as the created stream instance.
+ :param m: memory map on which to oparate
+ :param close_on_deletion: if True, the memory map will be closed once we are
+ being deleted"""
+ inst = DecompressMemMapReader(m, close_on_deletion, 0)
# read header
maxb = 512 # should really be enough, cgit uses 8192 I believe
- self._s = maxb
- hdr = self.read(maxb)
+ inst._s = maxb
+ hdr = inst.read(maxb)
hdrend = hdr.find("\0")
type, size = hdr[:hdrend].split(" ")
- self._s = int(size)
+ size = int(size)
+ inst._s = size
# adjust internal state to match actual header length that we ignore
# The buffer will be depleted first on future reads
- self._br = 0
+ inst._br = 0
hdrend += 1 # count terminating \0
- self._buf = StringIO(hdr[hdrend:])
- self._buflen = len(hdr) - hdrend
+ inst._buf = StringIO(hdr[hdrend:])
+ inst._buflen = len(hdr) - hdrend
- return type, size
+ return type, size, inst
def read(self, size=-1):
if size < 1:
@@ -346,7 +380,35 @@ class DecompressMemMapReader(ODB_OStream):
#{ W Streams
-class FDCompressedSha1Writer(object):
+class Sha1Writer(object):
+ """Simple stream writer which produces a sha whenever you like as it degests
+ everything it is supposed to write"""
+
+ def __init__(self):
+ self.sha1 = make_sha("")
+
+ #{ Stream Interface
+
+ def write(self, data):
+ """:raise IOError: If not all bytes could be written
+ :return: lenght of incoming data"""
+ self.sha1.update(data)
+ return len(data)
+
+ # END stream interface
+
+ #{ Interface
+
+ def sha(self, as_hex = False):
+ """:return: sha so far
+ :param as_hex: if True, sha will be hex-encoded, binary otherwise"""
+ if as_hex:
+ return self.sha1.hexdigest()
+ return self.sha1.digest()
+
+ #} END interface
+
+class FDCompressedSha1Writer(Sha1Writer):
"""Digests data written to it, making the sha available, then compress the
data and write it to the file descriptor
:note: operates on raw file descriptors
@@ -357,10 +419,12 @@ class FDCompressedSha1Writer(object):
exc = IOError("Failed to write all bytes to filedescriptor")
def __init__(self, fd):
+ super(FDCompressedSha1Writer, self).__init__()
self.fd = fd
- self.sha1 = make_sha("")
self.zip = zlib.compressobj(Z_BEST_SPEED)
+ #{ Stream Interface
+
def write(self, data):
""":raise IOError: If not all bytes could be written
:return: lenght of incoming data"""
@@ -371,18 +435,12 @@ class FDCompressedSha1Writer(object):
raise self.exc
return len(data)
- def sha(self, as_hex = False):
- """:return: sha so far
- :param as_hex: if True, sha will be hex-encoded, binary otherwise"""
- if as_hex:
- return self.sha1.hexdigest()
- return self.sha1.digest()
-
def close(self):
remainder = self.zip.flush()
if write(self.fd, remainder) != len(remainder):
raise self.exc
return close(self.fd)
+ #} END stream interface
#} END W streams