diff options
Diffstat (limited to 'lib/git')
-rw-r--r-- | lib/git/objects/base.py | 8 | ||||
-rw-r--r-- | lib/git/objects/commit.py | 2 | ||||
-rw-r--r-- | lib/git/odb/db.py | 114 | ||||
-rw-r--r-- | lib/git/odb/stream.py | 388 | ||||
-rw-r--r-- | lib/git/odb/utils.py | 215 |
5 files changed, 465 insertions, 262 deletions
diff --git a/lib/git/objects/base.py b/lib/git/objects/base.py index 446c4406..76384888 100644 --- a/lib/git/objects/base.py +++ b/lib/git/objects/base.py @@ -76,10 +76,10 @@ class Object(LazyMixin): Retrieve object information """ if attr == "size": - typename, self.size = self.repo.odb.object_info(self.sha) + typename, self.size = self.repo.odb.info(self.sha) assert typename == self.type, _assertion_msg_format % (self.sha, typename, self.type) elif attr == "data": - typename, self.size, stream = self.repo.odb.object(self.sha) + typename, self.size, stream = self.repo.odb.stream(self.sha) self.data = stream.read() # once we have an own odb, we can delay reading assert typename == self.type, _assertion_msg_format % (self.sha, typename, self.type) else: @@ -124,14 +124,14 @@ class Object(LazyMixin): def data_stream(self): """ :return: File Object compatible stream to the uncompressed raw data of the object :note: returned streams must be read in order""" - type, size, stream = self.repo.odb.object(self.sha) + type, size, stream = self.repo.odb.stream(self.sha) return stream def stream_data(self, ostream): """Writes our data directly to the given output stream :param ostream: File object compatible stream object. :return: self""" - type, size, istream = self.repo.odb.object(self.sha) + type, size, istream = self.repo.odb.stream(self.sha) stream_copy(istream, ostream) return self diff --git a/lib/git/objects/commit.py b/lib/git/objects/commit.py index d56ce306..dbc0cf27 100644 --- a/lib/git/objects/commit.py +++ b/lib/git/objects/commit.py @@ -346,7 +346,7 @@ class Commit(base.Object, Iterable, diff.Diffable, utils.Traversable, utils.Seri streamlen = stream.tell() stream.seek(0) - new_commit.sha = repo.odb.to_object(cls.type, streamlen, stream, sha_as_hex=True) + new_commit.sha = repo.odb.store(cls.type, streamlen, stream, sha_as_hex=True) if head: try: diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index 1d1d4c40..7ae8f446 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -6,9 +6,12 @@ from git.errors import ( BadObjectType ) -from utils import ( +from stream import ( DecompressMemMapReader, - FDCompressedSha1Writer, + FDCompressedSha1Writer + ) + +from utils import ( ENOENT, to_hex_sha, exists, @@ -31,7 +34,7 @@ import mmap import os -class iObjectDBR(object): +class ObjectDBR(object): """Defines an interface for object database lookup. Objects are identified either by hex-sha (40 bytes) or by sha (20 bytes)""" @@ -48,62 +51,87 @@ class iObjectDBR(object): :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") - def object(self, sha): - """ - :return: tuple(type_string, size_in_bytes, stream) a tuple with object - information including its type, its size as well as a stream from which its - contents can be read + def info(self, sha): + """ :return: ODB_Info instance :param sha: 40 bytes hexsha or 20 bytes binary sha :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") - def object_info(self, sha): - """ - :return: tuple(type_string, size_in_bytes) tuple with the object's type - string as well as its size in bytes + def info_async(self, input_channel): + """Retrieve information of a multitude of objects asynchronously + :param input_channel: Channel yielding the sha's of the objects of interest + :return: Channel yielding ODB_Info|InvalidODB_Info, in any order""" + raise NotImplementedError("To be implemented in subclass") + + def stream(self, sha): + """:return: ODB_OStream instance :param sha: 40 bytes hexsha or 20 bytes binary sha :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") + + def stream_async(self, input_channel): + """Retrieve the ODB_OStream of multiple objects + :param input_channel: see ``info`` + :param max_threads: see ``ObjectDBW.store`` + :return: Channel yielding ODB_OStream|InvalidODB_OStream instances in any order""" + raise NotImplementedError("To be implemented in subclass") #} END query interface -class iObjectDBW(object): +class ObjectDBW(object): """Defines an interface to create objects in the database""" - __slots__ = tuple() + __slots__ = "_ostream" + + def __init__(self, *args, **kwargs): + self._ostream = None #{ Edit Interface + def set_ostream(self, stream): + """Adjusts the stream to which all data should be sent when storing new objects + :param stream: if not None, the stream to use, if None the default stream + will be used. + :return: previously installed stream, or None if there was no override + :raise TypeError: if the stream doesn't have the supported functionality""" + cstream = self._ostream + self._ostream = stream + return cstream + + def ostream(self): + """:return: overridden output stream this instance will write to, or None + if it will write to the default stream""" + return self._ostream - def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True): + def store(self, istream): """Create a new object in the database - :return: the sha identifying the object in the database - :param type: type string identifying the object - :param size: size of the data to read from stream - :param stream: stream providing the data - :param dry_run: if True, the object database will not actually be changed - :param sha_as_hex: if True, the returned sha identifying the object will be - hex encoded, not binary + :return: the input istream object with its sha set to its corresponding value + :param istream: ODB_IStream compatible instance. If its sha is already set + to a value, the object will just be stored in the our database format, + in which case the input stream is expected to be in object format ( header + contents ). :raise IOError: if data could not be written""" raise NotImplementedError("To be implemented in subclass") - def to_objects(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0): - """Create multiple new objects in the database - :return: sequence of shas identifying the created objects in the order in which - they where given. - :param iter_info: iterable yielding tuples containing the type_string - size_in_bytes and the steam with the content data. - :param dry_run: see ``to_object`` - :param sha_as_hex: see ``to_object`` - :param max_threads: if < 1, any number of threads may be started while processing - the request, otherwise the given number of threads will be started. - :raise IOError: if data could not be written""" + def store_async(self, input_channel): + """Create multiple new objects in the database asynchronously. The method will + return right away, returning an output channel which receives the results as + they are computed. + + :return: Channel yielding your ODB_IStream which served as input, in any order. + The IStreams sha will be set to the sha it received during the process, + or its error attribute will be set to the exception informing about the error. + :param input_channel: Channel yielding ODB_IStream instance. + As the same instances will be used in the output channel, you can create a map + between the id(istream) -> istream + :note:As some ODB implementations implement this operation as atomic, they might + abort the whole operation if one item could not be processed. Hence check how + many items have actually been produced.""" # a trivial implementation, ignoring the threads for now # TODO: add configuration to the class to determine whether we may # actually use multiple threads, default False of course. If the add shas = list() for args in iter_info: - shas.append(self.to_object(dry_run=dry_run, sha_as_hex=sha_as_hex, *args)) + shas.append(self.store(dry_run=dry_run, sha_as_hex=sha_as_hex, *args)) return shas - + #} END edit interface @@ -118,6 +146,7 @@ class FileDBBase(object): :raise InvalidDBRoot: :note: The base will perform basic checking for accessability, but the subclass is required to verify that the root_path contains the database structure it needs""" + super(FileDBBase, self).__init__() if not os.path.isdir(root_path): raise InvalidDBRoot(root_path) self._root_path = root_path @@ -141,7 +170,7 @@ class FileDBBase(object): #} END utilities -class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): +class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW): """A database which operates on loose object files""" __slots__ = ('_hexsha_to_file', '_fd_open_flags') # CONFIGURATION @@ -210,7 +239,7 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): os.close(fd) # END assure file is closed - def object_info(self, sha): + def info(self, sha): m = self._map_loose_object(sha) try: return loose_object_header_info(m) @@ -233,8 +262,9 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): return False # END check existance - def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True): + def store(self, istream): # open a tmp file to write the data to + # todo: implement ostream properly fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path) writer = FDCompressedSha1Writer(fd) @@ -269,11 +299,11 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): return sha -class PackedDB(FileDBBase, iObjectDBR): +class PackedDB(FileDBBase, ObjectDBR): """A database operating on a set of object packs""" -class CompoundDB(iObjectDBR): +class CompoundDB(ObjectDBR): """A database which delegates calls to sub-databases""" @@ -281,7 +311,7 @@ class ReferenceDB(CompoundDB): """A database consisting of database referred to in a file""" -#class GitObjectDB(CompoundDB, iObjectDBW): +#class GitObjectDB(CompoundDB, ObjectDBW): class GitObjectDB(LooseObjectDB): """A database representing the default git object store, which includes loose objects, pack files and an alternates file @@ -296,7 +326,7 @@ class GitObjectDB(LooseObjectDB): super(GitObjectDB, self).__init__(root_path) self._git = git - def object_info(self, sha): + def info(self, sha): discard, type, size = self._git.get_object_header(sha) return type, size diff --git a/lib/git/odb/stream.py b/lib/git/odb/stream.py new file mode 100644 index 00000000..325c1444 --- /dev/null +++ b/lib/git/odb/stream.py @@ -0,0 +1,388 @@ +import zlib +from cStringIO import StringIO +from git.utils import make_sha +import errno + +from utils import ( + to_hex_sha, + to_bin_sha + ) + +__all__ = ('FDCompressedSha1Writer', 'DecompressMemMapReader') + + +# ZLIB configuration +# used when compressing objects - 1 to 9 ( slowest ) +Z_BEST_SPEED = 1 + + +#{ ODB Bases + +class ODB_Info(tuple): + """Carries information about an object in an ODB, provdiing information + about the sha of the object, the type_string as well as the uncompressed size + in bytes. + + It can be accessed using tuple notation and using attribute access notation:: + + assert dbi[0] == dbi.sha + assert dbi[1] == dbi.type + assert dbi[2] == dbi.size + + The type is designed to be as lighteight as possible.""" + __slots__ = tuple() + + def __new__(cls, sha, type, size): + return tuple.__new__(cls, (sha, type, size)) + + def __init__(self, sha, type, size): + pass + + #{ Interface + @property + def sha(self): + return self[0] + + @property + def type(self): + return self[1] + + @property + def size(self): + return self[2] + #} END interface + + +class ODB_OStream(ODB_Info): + """Base for object streams retrieved from the database, providing additional + information about the stream. + Generally, ODB streams are read-only as objects are immutable""" + __slots__ = tuple() + + def __new__(cls, sha, type, size, *args, **kwargs): + """Helps with the initialization of subclasses""" + return tuple.__new__(cls, (sha, type, size)) + + def is_compressed(self): + """:return: True if reads of this stream yield zlib compressed data. + :note: this does not imply anything about the actual internal storage. + Hence the data could be uncompressed, but read compressed, or vice versa""" + raise NotImplementedError("To be implemented by subclass") + + +class ODB_IStream(list): + """Represents an input content stream to be fed into the ODB. It is mutable to allow + the ODB to record information about the operations outcome right in this instance. + + It provides interfaces for the ODB_OStream and a StreamReader to allow the instance + to blend in without prior conversion. + + The only method your content stream must support is 'read'""" + __slots__ = tuple() + + def __new__(cls, type, size, stream, sha=None, compressed=False): + list.__new__(cls, (sha, type, size, stream, compressed, None)) + + def __init__(cls, type, size, stream, sha=None, compressed=None): + pass + + #{ Interface + + def hexsha(self): + """:return: our sha, hex encoded, 40 bytes""" + return to_hex_sha(self[0]) + + def binsha(self): + """:return: our sha as binary, 20 bytes""" + return to_bin_sha(self[0]) + + def _error(self): + """:return: the error that occurred when processing the stream, or None""" + return self[5] + + def _set_error(self, exc): + """Set this input stream to the given exc, may be None to reset the error""" + self[5] = exc + + error = property(_error, _set_error) + + #} END interface + + #{ Stream Reader Interface + + def read(self, size=-1): + """Implements a simple stream reader interface, passing the read call on + to our internal stream""" + return self[3].read(size) + + #} END stream reader interface + + #{ interface + + def _set_sha(self, sha): + self[0] = sha + + def _sha(self): + return self[0] + + sha = property(_sha, _set_sha) + + @property + def type(self): + return self[1] + + @property + def size(self): + return self[2] + + #} END odb info interface + + #{ ODB_OStream interface + + def is_compressed(self): + return self[4] + + #} END ODB_OStream interface + + +class InvalidODB_Info(tuple): + """Carries information about a sha identifying an object which is invalid in + the queried database. The exception attribute provides more information about + the cause of the issue""" + __slots__ = tuple() + + def __new__(cls, sha, exc): + return tuple.__new__(cls, (sha, exc)) + + def __init__(self, sha, exc): + pass + + @property + def sha(self): + return self[0] + + @property + def error(self): + """:return: exception instance explaining the failure""" + return self[1] + +class InvalidODB_OStream(InvalidODB_Info): + """Carries information about an invalid ODB stream""" + __slots__ = tuple() + +#} END ODB Bases + + +#{ RO Streams + +class DecompressMemMapReader(ODB_OStream): + """Reads data in chunks from a memory map and decompresses it. The client sees + only the uncompressed data, respective file-like read calls are handling on-demand + buffered decompression accordingly + + A constraint on the total size of bytes is activated, simulating + a logical file within a possibly larger physical memory area + + To read efficiently, you clearly don't want to read individual bytes, instead, + read a few kilobytes at least. + + :note: The chunk-size should be carefully selected as it will involve quite a bit + of string copying due to the way the zlib is implemented. Its very wasteful, + hence we try to find a good tradeoff between allocation time and number of + times we actually allocate. An own zlib implementation would be good here + to better support streamed reading - it would only need to keep the mmap + and decompress it into chunks, thats all ... """ + # __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') + + max_read_size = 512*1024 + + def __init__(self, sha, type, size, m, close_on_deletion): + """Initialize with mmap for stream reading""" + self._m = m + self._zip = zlib.decompressobj() + self._buf = None # buffer of decompressed bytes + self._buflen = 0 # length of bytes in buffer + self._s = 0 # size of uncompressed data to read in total + self._br = 0 # num uncompressed bytes read + self._cws = 0 # start byte of compression window + self._cwe = 0 # end byte of compression window + self._close = close_on_deletion # close the memmap on deletion ? + + def __del__(self): + if self._close: + self._m.close() + # END handle resource freeing + + def initialize(self, size=0): + """Initialize this instance for acting as a read-only stream for size bytes. + :param size: size in bytes to be decompresed before being depleted. + If 0, default object header information is parsed from the data, + returning a tuple of (type_string, uncompressed_size) + If not 0, the size will be used, and None is returned. + :note: must only be called exactly once""" + if size: + self._s = size + return + # END handle size + + # read header + maxb = 512 # should really be enough, cgit uses 8192 I believe + self._s = maxb + hdr = self.read(maxb) + hdrend = hdr.find("\0") + type, size = hdr[:hdrend].split(" ") + self._s = int(size) + + # adjust internal state to match actual header length that we ignore + # The buffer will be depleted first on future reads + self._br = 0 + hdrend += 1 # count terminating \0 + self._buf = StringIO(hdr[hdrend:]) + self._buflen = len(hdr) - hdrend + + return type, size + + def read(self, size=-1): + if size < 1: + size = self._s - self._br + else: + size = min(size, self._s - self._br) + # END clamp size + + if size == 0: + return str() + # END handle depletion + + # protect from memory peaks + # If he tries to read large chunks, our memory patterns get really bad + # as we end up copying a possibly huge chunk from our memory map right into + # memory. This might not even be possible. Nonetheless, try to dampen the + # effect a bit by reading in chunks, returning a huge string in the end. + # Our performance now depends on StringIO. This way we don't need two large + # buffers in peak times, but only one large one in the end which is + # the return buffer + # NO: We don't do it - if the user thinks its best, he is right. If he + # has trouble, he will start reading in chunks. According to our tests + # its still faster if we read 10 Mb at once instead of chunking it. + + # if size > self.max_read_size: + # sio = StringIO() + # while size: + # read_size = min(self.max_read_size, size) + # data = self.read(read_size) + # sio.write(data) + # size -= len(data) + # if len(data) < read_size: + # break + # # END data loop + # sio.seek(0) + # return sio.getvalue() + # # END handle maxread + # + # deplete the buffer, then just continue using the decompress object + # which has an own buffer. We just need this to transparently parse the + # header from the zlib stream + dat = str() + if self._buf: + if self._buflen >= size: + # have enough data + dat = self._buf.read(size) + self._buflen -= size + self._br += size + return dat + else: + dat = self._buf.read() # ouch, duplicates data + size -= self._buflen + self._br += self._buflen + + self._buflen = 0 + self._buf = None + # END handle buffer len + # END handle buffer + + # decompress some data + # Abstract: zlib needs to operate on chunks of our memory map ( which may + # be large ), as it will otherwise and always fill in the 'unconsumed_tail' + # attribute which possible reads our whole map to the end, forcing + # everything to be read from disk even though just a portion was requested. + # As this would be a nogo, we workaround it by passing only chunks of data, + # moving the window into the memory map along as we decompress, which keeps + # the tail smaller than our chunk-size. This causes 'only' the chunk to be + # copied once, and another copy of a part of it when it creates the unconsumed + # tail. We have to use it to hand in the appropriate amount of bytes durin g + # the next read. + tail = self._zip.unconsumed_tail + if tail: + # move the window, make it as large as size demands. For code-clarity, + # we just take the chunk from our map again instead of reusing the unconsumed + # tail. The latter one would safe some memory copying, but we could end up + # with not getting enough data uncompressed, so we had to sort that out as well. + # Now we just assume the worst case, hence the data is uncompressed and the window + # needs to be as large as the uncompressed bytes we want to read. + self._cws = self._cwe - len(tail) + self._cwe = self._cws + size + + + indata = self._m[self._cws:self._cwe] # another copy ... :( + # get the actual window end to be sure we don't use it for computations + self._cwe = self._cws + len(indata) + else: + cws = self._cws + self._cws = self._cwe + self._cwe = cws + size + indata = self._m[self._cws:self._cwe] # ... copy it again :( + # END handle tail + + dcompdat = self._zip.decompress(indata, size) + + self._br += len(dcompdat) + if dat: + dcompdat = dat + dcompdat + + return dcompdat + +#} END RO streams + + +#{ W Streams + +class FDCompressedSha1Writer(object): + """Digests data written to it, making the sha available, then compress the + data and write it to the file descriptor + :note: operates on raw file descriptors + :note: for this to work, you have to use the close-method of this instance""" + __slots__ = ("fd", "sha1", "zip") + + # default exception + exc = IOError("Failed to write all bytes to filedescriptor") + + def __init__(self, fd): + self.fd = fd + self.sha1 = make_sha("") + self.zip = zlib.compressobj(Z_BEST_SPEED) + + def write(self, data): + """:raise IOError: If not all bytes could be written + :return: lenght of incoming data""" + self.sha1.update(data) + cdata = self.zip.compress(data) + bytes_written = write(self.fd, cdata) + if bytes_written != len(cdata): + raise self.exc + return len(data) + + def sha(self, as_hex = False): + """:return: sha so far + :param as_hex: if True, sha will be hex-encoded, binary otherwise""" + if as_hex: + return self.sha1.hexdigest() + return self.sha1.digest() + + def close(self): + remainder = self.zip.flush() + if write(self.fd, remainder) != len(remainder): + raise self.exc + return close(self.fd) + + +#} END W streams diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index fd340962..61565ba9 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -1,10 +1,6 @@ import binascii import os -import zlib -from cStringIO import StringIO -from git.utils import make_sha import errno -from fun import chunk_size __all__ = ('FDSha1Writer', ) @@ -38,218 +34,7 @@ read = os.read write = os.write close = os.close -# ZLIB configuration -# used when compressing objects - 1 to 9 ( slowest ) -Z_BEST_SPEED = 1 #} END Routines -#{ Classes - -class FDCompressedSha1Writer(object): - """Digests data written to it, making the sha available, then compress the - data and write it to the file descriptor - :note: operates on raw file descriptors - :note: for this to work, you have to use the close-method of this instance""" - __slots__ = ("fd", "sha1", "zip") - - # default exception - exc = IOError("Failed to write all bytes to filedescriptor") - - def __init__(self, fd): - self.fd = fd - self.sha1 = make_sha("") - self.zip = zlib.compressobj(Z_BEST_SPEED) - - def write(self, data): - """:raise IOError: If not all bytes could be written - :return: lenght of incoming data""" - self.sha1.update(data) - cdata = self.zip.compress(data) - bytes_written = write(self.fd, cdata) - if bytes_written != len(cdata): - raise self.exc - return len(data) - - def sha(self, as_hex = False): - """:return: sha so far - :param as_hex: if True, sha will be hex-encoded, binary otherwise""" - if as_hex: - return self.sha1.hexdigest() - return self.sha1.digest() - - def close(self): - remainder = self.zip.flush() - if write(self.fd, remainder) != len(remainder): - raise self.exc - return close(self.fd) - - -class DecompressMemMapReader(object): - """Reads data in chunks from a memory map and decompresses it. The client sees - only the uncompressed data, respective file-like read calls are handling on-demand - buffered decompression accordingly - - A constraint on the total size of bytes is activated, simulating - a logical file within a possibly larger physical memory area - - To read efficiently, you clearly don't want to read individual bytes, instead, - read a few kilobytes at least. - - :note: The chunk-size should be carefully selected as it will involve quite a bit - of string copying due to the way the zlib is implemented. Its very wasteful, - hence we try to find a good tradeoff between allocation time and number of - times we actually allocate. An own zlib implementation would be good here - to better support streamed reading - it would only need to keep the mmap - and decompress it into chunks, thats all ... """ - __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') - - max_read_size = 512*1024 - - def __init__(self, m, close_on_deletion): - """Initialize with mmap for stream reading""" - self._m = m - self._zip = zlib.decompressobj() - self._buf = None # buffer of decompressed bytes - self._buflen = 0 # length of bytes in buffer - self._s = 0 # size of uncompressed data to read in total - self._br = 0 # num uncompressed bytes read - self._cws = 0 # start byte of compression window - self._cwe = 0 # end byte of compression window - self._close = close_on_deletion # close the memmap on deletion ? - - def __del__(self): - if self._close: - self._m.close() - # END handle resource freeing - - def initialize(self, size=0): - """Initialize this instance for acting as a read-only stream for size bytes. - :param size: size in bytes to be decompresed before being depleted. - If 0, default object header information is parsed from the data, - returning a tuple of (type_string, uncompressed_size) - If not 0, the size will be used, and None is returned. - :note: must only be called exactly once""" - if size: - self._s = size - return - # END handle size - - # read header - maxb = 512 # should really be enough, cgit uses 8192 I believe - self._s = maxb - hdr = self.read(maxb) - hdrend = hdr.find("\0") - type, size = hdr[:hdrend].split(" ") - self._s = int(size) - - # adjust internal state to match actual header length that we ignore - # The buffer will be depleted first on future reads - self._br = 0 - hdrend += 1 # count terminating \0 - self._buf = StringIO(hdr[hdrend:]) - self._buflen = len(hdr) - hdrend - - return type, size - - def read(self, size=-1): - if size < 1: - size = self._s - self._br - else: - size = min(size, self._s - self._br) - # END clamp size - - if size == 0: - return str() - # END handle depletion - - # protect from memory peaks - # If he tries to read large chunks, our memory patterns get really bad - # as we end up copying a possibly huge chunk from our memory map right into - # memory. This might not even be possible. Nonetheless, try to dampen the - # effect a bit by reading in chunks, returning a huge string in the end. - # Our performance now depends on StringIO. This way we don't need two large - # buffers in peak times, but only one large one in the end which is - # the return buffer - # NO: We don't do it - if the user thinks its best, he is right. If he - # has trouble, he will start reading in chunks. According to our tests - # its still faster if we read 10 Mb at once instead of chunking it. - - # if size > self.max_read_size: - # sio = StringIO() - # while size: - # read_size = min(self.max_read_size, size) - # data = self.read(read_size) - # sio.write(data) - # size -= len(data) - # if len(data) < read_size: - # break - # # END data loop - # sio.seek(0) - # return sio.getvalue() - # # END handle maxread - # - # deplete the buffer, then just continue using the decompress object - # which has an own buffer. We just need this to transparently parse the - # header from the zlib stream - dat = str() - if self._buf: - if self._buflen >= size: - # have enough data - dat = self._buf.read(size) - self._buflen -= size - self._br += size - return dat - else: - dat = self._buf.read() # ouch, duplicates data - size -= self._buflen - self._br += self._buflen - - self._buflen = 0 - self._buf = None - # END handle buffer len - # END handle buffer - - # decompress some data - # Abstract: zlib needs to operate on chunks of our memory map ( which may - # be large ), as it will otherwise and always fill in the 'unconsumed_tail' - # attribute which possible reads our whole map to the end, forcing - # everything to be read from disk even though just a portion was requested. - # As this would be a nogo, we workaround it by passing only chunks of data, - # moving the window into the memory map along as we decompress, which keeps - # the tail smaller than our chunk-size. This causes 'only' the chunk to be - # copied once, and another copy of a part of it when it creates the unconsumed - # tail. We have to use it to hand in the appropriate amount of bytes durin g - # the next read. - tail = self._zip.unconsumed_tail - if tail: - # move the window, make it as large as size demands. For code-clarity, - # we just take the chunk from our map again instead of reusing the unconsumed - # tail. The latter one would safe some memory copying, but we could end up - # with not getting enough data uncompressed, so we had to sort that out as well. - # Now we just assume the worst case, hence the data is uncompressed and the window - # needs to be as large as the uncompressed bytes we want to read. - self._cws = self._cwe - len(tail) - self._cwe = self._cws + size - - - indata = self._m[self._cws:self._cwe] # another copy ... :( - # get the actual window end to be sure we don't use it for computations - self._cwe = self._cws + len(indata) - else: - cws = self._cws - self._cws = self._cwe - self._cwe = cws + size - indata = self._m[self._cws:self._cwe] # ... copy it again :( - # END handle tail - - dcompdat = self._zip.decompress(indata, size) - - self._br += len(dcompdat) - if dat: - dcompdat = dat + dcompdat - - return dcompdat - -#} END classes |