summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-04 14:41:15 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-04 14:41:15 +0200
commita1e80445ad5cb6da4c0070d7cb8af89da3b0803b (patch)
treeff94069b9d49d5a06576a0838a1bbde1e8c992ae
parentb01ca6a3e4ae9d944d799743c8ff774e2a7a82b6 (diff)
downloadgitpython-a1e80445ad5cb6da4c0070d7cb8af89da3b0803b.tar.gz
initial version of new odb design to facilitate a channel based multi-threading implementation of all odb functions
-rw-r--r--lib/git/objects/base.py8
-rw-r--r--lib/git/objects/commit.py2
-rw-r--r--lib/git/odb/db.py114
-rw-r--r--lib/git/odb/stream.py388
-rw-r--r--lib/git/odb/utils.py215
-rw-r--r--test/git/performance/test_commit.py2
-rw-r--r--test/git/performance/test_streams.py6
-rw-r--r--test/git/test_commit.py4
-rw-r--r--test/git/test_odb.py10
9 files changed, 476 insertions, 273 deletions
diff --git a/lib/git/objects/base.py b/lib/git/objects/base.py
index 446c4406..76384888 100644
--- a/lib/git/objects/base.py
+++ b/lib/git/objects/base.py
@@ -76,10 +76,10 @@ class Object(LazyMixin):
Retrieve object information
"""
if attr == "size":
- typename, self.size = self.repo.odb.object_info(self.sha)
+ typename, self.size = self.repo.odb.info(self.sha)
assert typename == self.type, _assertion_msg_format % (self.sha, typename, self.type)
elif attr == "data":
- typename, self.size, stream = self.repo.odb.object(self.sha)
+ typename, self.size, stream = self.repo.odb.stream(self.sha)
self.data = stream.read() # once we have an own odb, we can delay reading
assert typename == self.type, _assertion_msg_format % (self.sha, typename, self.type)
else:
@@ -124,14 +124,14 @@ class Object(LazyMixin):
def data_stream(self):
""" :return: File Object compatible stream to the uncompressed raw data of the object
:note: returned streams must be read in order"""
- type, size, stream = self.repo.odb.object(self.sha)
+ type, size, stream = self.repo.odb.stream(self.sha)
return stream
def stream_data(self, ostream):
"""Writes our data directly to the given output stream
:param ostream: File object compatible stream object.
:return: self"""
- type, size, istream = self.repo.odb.object(self.sha)
+ type, size, istream = self.repo.odb.stream(self.sha)
stream_copy(istream, ostream)
return self
diff --git a/lib/git/objects/commit.py b/lib/git/objects/commit.py
index d56ce306..dbc0cf27 100644
--- a/lib/git/objects/commit.py
+++ b/lib/git/objects/commit.py
@@ -346,7 +346,7 @@ class Commit(base.Object, Iterable, diff.Diffable, utils.Traversable, utils.Seri
streamlen = stream.tell()
stream.seek(0)
- new_commit.sha = repo.odb.to_object(cls.type, streamlen, stream, sha_as_hex=True)
+ new_commit.sha = repo.odb.store(cls.type, streamlen, stream, sha_as_hex=True)
if head:
try:
diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py
index 1d1d4c40..7ae8f446 100644
--- a/lib/git/odb/db.py
+++ b/lib/git/odb/db.py
@@ -6,9 +6,12 @@ from git.errors import (
BadObjectType
)
-from utils import (
+from stream import (
DecompressMemMapReader,
- FDCompressedSha1Writer,
+ FDCompressedSha1Writer
+ )
+
+from utils import (
ENOENT,
to_hex_sha,
exists,
@@ -31,7 +34,7 @@ import mmap
import os
-class iObjectDBR(object):
+class ObjectDBR(object):
"""Defines an interface for object database lookup.
Objects are identified either by hex-sha (40 bytes) or
by sha (20 bytes)"""
@@ -48,62 +51,87 @@ class iObjectDBR(object):
:raise BadObject:"""
raise NotImplementedError("To be implemented in subclass")
- def object(self, sha):
- """
- :return: tuple(type_string, size_in_bytes, stream) a tuple with object
- information including its type, its size as well as a stream from which its
- contents can be read
+ def info(self, sha):
+ """ :return: ODB_Info instance
:param sha: 40 bytes hexsha or 20 bytes binary sha
:raise BadObject:"""
raise NotImplementedError("To be implemented in subclass")
- def object_info(self, sha):
- """
- :return: tuple(type_string, size_in_bytes) tuple with the object's type
- string as well as its size in bytes
+ def info_async(self, input_channel):
+ """Retrieve information of a multitude of objects asynchronously
+ :param input_channel: Channel yielding the sha's of the objects of interest
+ :return: Channel yielding ODB_Info|InvalidODB_Info, in any order"""
+ raise NotImplementedError("To be implemented in subclass")
+
+ def stream(self, sha):
+ """:return: ODB_OStream instance
:param sha: 40 bytes hexsha or 20 bytes binary sha
:raise BadObject:"""
raise NotImplementedError("To be implemented in subclass")
+
+ def stream_async(self, input_channel):
+ """Retrieve the ODB_OStream of multiple objects
+ :param input_channel: see ``info``
+ :param max_threads: see ``ObjectDBW.store``
+ :return: Channel yielding ODB_OStream|InvalidODB_OStream instances in any order"""
+ raise NotImplementedError("To be implemented in subclass")
#} END query interface
-class iObjectDBW(object):
+class ObjectDBW(object):
"""Defines an interface to create objects in the database"""
- __slots__ = tuple()
+ __slots__ = "_ostream"
+
+ def __init__(self, *args, **kwargs):
+ self._ostream = None
#{ Edit Interface
+ def set_ostream(self, stream):
+ """Adjusts the stream to which all data should be sent when storing new objects
+ :param stream: if not None, the stream to use, if None the default stream
+ will be used.
+ :return: previously installed stream, or None if there was no override
+ :raise TypeError: if the stream doesn't have the supported functionality"""
+ cstream = self._ostream
+ self._ostream = stream
+ return cstream
+
+ def ostream(self):
+ """:return: overridden output stream this instance will write to, or None
+ if it will write to the default stream"""
+ return self._ostream
- def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True):
+ def store(self, istream):
"""Create a new object in the database
- :return: the sha identifying the object in the database
- :param type: type string identifying the object
- :param size: size of the data to read from stream
- :param stream: stream providing the data
- :param dry_run: if True, the object database will not actually be changed
- :param sha_as_hex: if True, the returned sha identifying the object will be
- hex encoded, not binary
+ :return: the input istream object with its sha set to its corresponding value
+ :param istream: ODB_IStream compatible instance. If its sha is already set
+ to a value, the object will just be stored in the our database format,
+ in which case the input stream is expected to be in object format ( header + contents ).
:raise IOError: if data could not be written"""
raise NotImplementedError("To be implemented in subclass")
- def to_objects(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0):
- """Create multiple new objects in the database
- :return: sequence of shas identifying the created objects in the order in which
- they where given.
- :param iter_info: iterable yielding tuples containing the type_string
- size_in_bytes and the steam with the content data.
- :param dry_run: see ``to_object``
- :param sha_as_hex: see ``to_object``
- :param max_threads: if < 1, any number of threads may be started while processing
- the request, otherwise the given number of threads will be started.
- :raise IOError: if data could not be written"""
+ def store_async(self, input_channel):
+ """Create multiple new objects in the database asynchronously. The method will
+ return right away, returning an output channel which receives the results as
+ they are computed.
+
+ :return: Channel yielding your ODB_IStream which served as input, in any order.
+ The IStreams sha will be set to the sha it received during the process,
+ or its error attribute will be set to the exception informing about the error.
+ :param input_channel: Channel yielding ODB_IStream instance.
+ As the same instances will be used in the output channel, you can create a map
+ between the id(istream) -> istream
+ :note:As some ODB implementations implement this operation as atomic, they might
+ abort the whole operation if one item could not be processed. Hence check how
+ many items have actually been produced."""
# a trivial implementation, ignoring the threads for now
# TODO: add configuration to the class to determine whether we may
# actually use multiple threads, default False of course. If the add
shas = list()
for args in iter_info:
- shas.append(self.to_object(dry_run=dry_run, sha_as_hex=sha_as_hex, *args))
+ shas.append(self.store(dry_run=dry_run, sha_as_hex=sha_as_hex, *args))
return shas
-
+
#} END edit interface
@@ -118,6 +146,7 @@ class FileDBBase(object):
:raise InvalidDBRoot:
:note: The base will perform basic checking for accessability, but the subclass
is required to verify that the root_path contains the database structure it needs"""
+ super(FileDBBase, self).__init__()
if not os.path.isdir(root_path):
raise InvalidDBRoot(root_path)
self._root_path = root_path
@@ -141,7 +170,7 @@ class FileDBBase(object):
#} END utilities
-class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
+class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW):
"""A database which operates on loose object files"""
__slots__ = ('_hexsha_to_file', '_fd_open_flags')
# CONFIGURATION
@@ -210,7 +239,7 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
os.close(fd)
# END assure file is closed
- def object_info(self, sha):
+ def info(self, sha):
m = self._map_loose_object(sha)
try:
return loose_object_header_info(m)
@@ -233,8 +262,9 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
return False
# END check existance
- def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True):
+ def store(self, istream):
# open a tmp file to write the data to
+ # todo: implement ostream properly
fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path)
writer = FDCompressedSha1Writer(fd)
@@ -269,11 +299,11 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
return sha
-class PackedDB(FileDBBase, iObjectDBR):
+class PackedDB(FileDBBase, ObjectDBR):
"""A database operating on a set of object packs"""
-class CompoundDB(iObjectDBR):
+class CompoundDB(ObjectDBR):
"""A database which delegates calls to sub-databases"""
@@ -281,7 +311,7 @@ class ReferenceDB(CompoundDB):
"""A database consisting of database referred to in a file"""
-#class GitObjectDB(CompoundDB, iObjectDBW):
+#class GitObjectDB(CompoundDB, ObjectDBW):
class GitObjectDB(LooseObjectDB):
"""A database representing the default git object store, which includes loose
objects, pack files and an alternates file
@@ -296,7 +326,7 @@ class GitObjectDB(LooseObjectDB):
super(GitObjectDB, self).__init__(root_path)
self._git = git
- def object_info(self, sha):
+ def info(self, sha):
discard, type, size = self._git.get_object_header(sha)
return type, size
diff --git a/lib/git/odb/stream.py b/lib/git/odb/stream.py
new file mode 100644
index 00000000..325c1444
--- /dev/null
+++ b/lib/git/odb/stream.py
@@ -0,0 +1,388 @@
+import zlib
+from cStringIO import StringIO
+from git.utils import make_sha
+import errno
+
+from utils import (
+ to_hex_sha,
+ to_bin_sha
+ )
+
+__all__ = ('FDCompressedSha1Writer', 'DecompressMemMapReader')
+
+
+# ZLIB configuration
+# used when compressing objects - 1 to 9 ( slowest )
+Z_BEST_SPEED = 1
+
+
+#{ ODB Bases
+
+class ODB_Info(tuple):
+ """Carries information about an object in an ODB, provdiing information
+ about the sha of the object, the type_string as well as the uncompressed size
+ in bytes.
+
+ It can be accessed using tuple notation and using attribute access notation::
+
+ assert dbi[0] == dbi.sha
+ assert dbi[1] == dbi.type
+ assert dbi[2] == dbi.size
+
+ The type is designed to be as lighteight as possible."""
+ __slots__ = tuple()
+
+ def __new__(cls, sha, type, size):
+ return tuple.__new__(cls, (sha, type, size))
+
+ def __init__(self, sha, type, size):
+ pass
+
+ #{ Interface
+ @property
+ def sha(self):
+ return self[0]
+
+ @property
+ def type(self):
+ return self[1]
+
+ @property
+ def size(self):
+ return self[2]
+ #} END interface
+
+
+class ODB_OStream(ODB_Info):
+ """Base for object streams retrieved from the database, providing additional
+ information about the stream.
+ Generally, ODB streams are read-only as objects are immutable"""
+ __slots__ = tuple()
+
+ def __new__(cls, sha, type, size, *args, **kwargs):
+ """Helps with the initialization of subclasses"""
+ return tuple.__new__(cls, (sha, type, size))
+
+ def is_compressed(self):
+ """:return: True if reads of this stream yield zlib compressed data.
+ :note: this does not imply anything about the actual internal storage.
+ Hence the data could be uncompressed, but read compressed, or vice versa"""
+ raise NotImplementedError("To be implemented by subclass")
+
+
+class ODB_IStream(list):
+ """Represents an input content stream to be fed into the ODB. It is mutable to allow
+ the ODB to record information about the operations outcome right in this instance.
+
+ It provides interfaces for the ODB_OStream and a StreamReader to allow the instance
+ to blend in without prior conversion.
+
+ The only method your content stream must support is 'read'"""
+ __slots__ = tuple()
+
+ def __new__(cls, type, size, stream, sha=None, compressed=False):
+ list.__new__(cls, (sha, type, size, stream, compressed, None))
+
+ def __init__(cls, type, size, stream, sha=None, compressed=None):
+ pass
+
+ #{ Interface
+
+ def hexsha(self):
+ """:return: our sha, hex encoded, 40 bytes"""
+ return to_hex_sha(self[0])
+
+ def binsha(self):
+ """:return: our sha as binary, 20 bytes"""
+ return to_bin_sha(self[0])
+
+ def _error(self):
+ """:return: the error that occurred when processing the stream, or None"""
+ return self[5]
+
+ def _set_error(self, exc):
+ """Set this input stream to the given exc, may be None to reset the error"""
+ self[5] = exc
+
+ error = property(_error, _set_error)
+
+ #} END interface
+
+ #{ Stream Reader Interface
+
+ def read(self, size=-1):
+ """Implements a simple stream reader interface, passing the read call on
+ to our internal stream"""
+ return self[3].read(size)
+
+ #} END stream reader interface
+
+ #{ interface
+
+ def _set_sha(self, sha):
+ self[0] = sha
+
+ def _sha(self):
+ return self[0]
+
+ sha = property(_sha, _set_sha)
+
+ @property
+ def type(self):
+ return self[1]
+
+ @property
+ def size(self):
+ return self[2]
+
+ #} END odb info interface
+
+ #{ ODB_OStream interface
+
+ def is_compressed(self):
+ return self[4]
+
+ #} END ODB_OStream interface
+
+
+class InvalidODB_Info(tuple):
+ """Carries information about a sha identifying an object which is invalid in
+ the queried database. The exception attribute provides more information about
+ the cause of the issue"""
+ __slots__ = tuple()
+
+ def __new__(cls, sha, exc):
+ return tuple.__new__(cls, (sha, exc))
+
+ def __init__(self, sha, exc):
+ pass
+
+ @property
+ def sha(self):
+ return self[0]
+
+ @property
+ def error(self):
+ """:return: exception instance explaining the failure"""
+ return self[1]
+
+class InvalidODB_OStream(InvalidODB_Info):
+ """Carries information about an invalid ODB stream"""
+ __slots__ = tuple()
+
+#} END ODB Bases
+
+
+#{ RO Streams
+
+class DecompressMemMapReader(ODB_OStream):
+ """Reads data in chunks from a memory map and decompresses it. The client sees
+ only the uncompressed data, respective file-like read calls are handling on-demand
+ buffered decompression accordingly
+
+ A constraint on the total size of bytes is activated, simulating
+ a logical file within a possibly larger physical memory area
+
+ To read efficiently, you clearly don't want to read individual bytes, instead,
+ read a few kilobytes at least.
+
+ :note: The chunk-size should be carefully selected as it will involve quite a bit
+ of string copying due to the way the zlib is implemented. Its very wasteful,
+ hence we try to find a good tradeoff between allocation time and number of
+ times we actually allocate. An own zlib implementation would be good here
+ to better support streamed reading - it would only need to keep the mmap
+ and decompress it into chunks, thats all ... """
+ # __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
+
+ max_read_size = 512*1024
+
+ def __init__(self, sha, type, size, m, close_on_deletion):
+ """Initialize with mmap for stream reading"""
+ self._m = m
+ self._zip = zlib.decompressobj()
+ self._buf = None # buffer of decompressed bytes
+ self._buflen = 0 # length of bytes in buffer
+ self._s = 0 # size of uncompressed data to read in total
+ self._br = 0 # num uncompressed bytes read
+ self._cws = 0 # start byte of compression window
+ self._cwe = 0 # end byte of compression window
+ self._close = close_on_deletion # close the memmap on deletion ?
+
+ def __del__(self):
+ if self._close:
+ self._m.close()
+ # END handle resource freeing
+
+ def initialize(self, size=0):
+ """Initialize this instance for acting as a read-only stream for size bytes.
+ :param size: size in bytes to be decompresed before being depleted.
+ If 0, default object header information is parsed from the data,
+ returning a tuple of (type_string, uncompressed_size)
+ If not 0, the size will be used, and None is returned.
+ :note: must only be called exactly once"""
+ if size:
+ self._s = size
+ return
+ # END handle size
+
+ # read header
+ maxb = 512 # should really be enough, cgit uses 8192 I believe
+ self._s = maxb
+ hdr = self.read(maxb)
+ hdrend = hdr.find("\0")
+ type, size = hdr[:hdrend].split(" ")
+ self._s = int(size)
+
+ # adjust internal state to match actual header length that we ignore
+ # The buffer will be depleted first on future reads
+ self._br = 0
+ hdrend += 1 # count terminating \0
+ self._buf = StringIO(hdr[hdrend:])
+ self._buflen = len(hdr) - hdrend
+
+ return type, size
+
+ def read(self, size=-1):
+ if size < 1:
+ size = self._s - self._br
+ else:
+ size = min(size, self._s - self._br)
+ # END clamp size
+
+ if size == 0:
+ return str()
+ # END handle depletion
+
+ # protect from memory peaks
+ # If he tries to read large chunks, our memory patterns get really bad
+ # as we end up copying a possibly huge chunk from our memory map right into
+ # memory. This might not even be possible. Nonetheless, try to dampen the
+ # effect a bit by reading in chunks, returning a huge string in the end.
+ # Our performance now depends on StringIO. This way we don't need two large
+ # buffers in peak times, but only one large one in the end which is
+ # the return buffer
+ # NO: We don't do it - if the user thinks its best, he is right. If he
+ # has trouble, he will start reading in chunks. According to our tests
+ # its still faster if we read 10 Mb at once instead of chunking it.
+
+ # if size > self.max_read_size:
+ # sio = StringIO()
+ # while size:
+ # read_size = min(self.max_read_size, size)
+ # data = self.read(read_size)
+ # sio.write(data)
+ # size -= len(data)
+ # if len(data) < read_size:
+ # break
+ # # END data loop
+ # sio.seek(0)
+ # return sio.getvalue()
+ # # END handle maxread
+ #
+ # deplete the buffer, then just continue using the decompress object
+ # which has an own buffer. We just need this to transparently parse the
+ # header from the zlib stream
+ dat = str()
+ if self._buf:
+ if self._buflen >= size:
+ # have enough data
+ dat = self._buf.read(size)
+ self._buflen -= size
+ self._br += size
+ return dat
+ else:
+ dat = self._buf.read() # ouch, duplicates data
+ size -= self._buflen
+ self._br += self._buflen
+
+ self._buflen = 0
+ self._buf = None
+ # END handle buffer len
+ # END handle buffer
+
+ # decompress some data
+ # Abstract: zlib needs to operate on chunks of our memory map ( which may
+ # be large ), as it will otherwise and always fill in the 'unconsumed_tail'
+ # attribute which possible reads our whole map to the end, forcing
+ # everything to be read from disk even though just a portion was requested.
+ # As this would be a nogo, we workaround it by passing only chunks of data,
+ # moving the window into the memory map along as we decompress, which keeps
+ # the tail smaller than our chunk-size. This causes 'only' the chunk to be
+ # copied once, and another copy of a part of it when it creates the unconsumed
+ # tail. We have to use it to hand in the appropriate amount of bytes durin g
+ # the next read.
+ tail = self._zip.unconsumed_tail
+ if tail:
+ # move the window, make it as large as size demands. For code-clarity,
+ # we just take the chunk from our map again instead of reusing the unconsumed
+ # tail. The latter one would safe some memory copying, but we could end up
+ # with not getting enough data uncompressed, so we had to sort that out as well.
+ # Now we just assume the worst case, hence the data is uncompressed and the window
+ # needs to be as large as the uncompressed bytes we want to read.
+ self._cws = self._cwe - len(tail)
+ self._cwe = self._cws + size
+
+
+ indata = self._m[self._cws:self._cwe] # another copy ... :(
+ # get the actual window end to be sure we don't use it for computations
+ self._cwe = self._cws + len(indata)
+ else:
+ cws = self._cws
+ self._cws = self._cwe
+ self._cwe = cws + size
+ indata = self._m[self._cws:self._cwe] # ... copy it again :(
+ # END handle tail
+
+ dcompdat = self._zip.decompress(indata, size)
+
+ self._br += len(dcompdat)
+ if dat:
+ dcompdat = dat + dcompdat
+
+ return dcompdat
+
+#} END RO streams
+
+
+#{ W Streams
+
+class FDCompressedSha1Writer(object):
+ """Digests data written to it, making the sha available, then compress the
+ data and write it to the file descriptor
+ :note: operates on raw file descriptors
+ :note: for this to work, you have to use the close-method of this instance"""
+ __slots__ = ("fd", "sha1", "zip")
+
+ # default exception
+ exc = IOError("Failed to write all bytes to filedescriptor")
+
+ def __init__(self, fd):
+ self.fd = fd
+ self.sha1 = make_sha("")
+ self.zip = zlib.compressobj(Z_BEST_SPEED)
+
+ def write(self, data):
+ """:raise IOError: If not all bytes could be written
+ :return: lenght of incoming data"""
+ self.sha1.update(data)
+ cdata = self.zip.compress(data)
+ bytes_written = write(self.fd, cdata)
+ if bytes_written != len(cdata):
+ raise self.exc
+ return len(data)
+
+ def sha(self, as_hex = False):
+ """:return: sha so far
+ :param as_hex: if True, sha will be hex-encoded, binary otherwise"""
+ if as_hex:
+ return self.sha1.hexdigest()
+ return self.sha1.digest()
+
+ def close(self):
+ remainder = self.zip.flush()
+ if write(self.fd, remainder) != len(remainder):
+ raise self.exc
+ return close(self.fd)
+
+
+#} END W streams
diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py
index fd340962..61565ba9 100644
--- a/lib/git/odb/utils.py
+++ b/lib/git/odb/utils.py
@@ -1,10 +1,6 @@
import binascii
import os
-import zlib
-from cStringIO import StringIO
-from git.utils import make_sha
import errno
-from fun import chunk_size
__all__ = ('FDSha1Writer', )
@@ -38,218 +34,7 @@ read = os.read
write = os.write
close = os.close
-# ZLIB configuration
-# used when compressing objects - 1 to 9 ( slowest )
-Z_BEST_SPEED = 1
#} END Routines
-#{ Classes
-
-class FDCompressedSha1Writer(object):
- """Digests data written to it, making the sha available, then compress the
- data and write it to the file descriptor
- :note: operates on raw file descriptors
- :note: for this to work, you have to use the close-method of this instance"""
- __slots__ = ("fd", "sha1", "zip")
-
- # default exception
- exc = IOError("Failed to write all bytes to filedescriptor")
-
- def __init__(self, fd):
- self.fd = fd
- self.sha1 = make_sha("")
- self.zip = zlib.compressobj(Z_BEST_SPEED)
-
- def write(self, data):
- """:raise IOError: If not all bytes could be written
- :return: lenght of incoming data"""
- self.sha1.update(data)
- cdata = self.zip.compress(data)
- bytes_written = write(self.fd, cdata)
- if bytes_written != len(cdata):
- raise self.exc
- return len(data)
-
- def sha(self, as_hex = False):
- """:return: sha so far
- :param as_hex: if True, sha will be hex-encoded, binary otherwise"""
- if as_hex:
- return self.sha1.hexdigest()
- return self.sha1.digest()
-
- def close(self):
- remainder = self.zip.flush()
- if write(self.fd, remainder) != len(remainder):
- raise self.exc
- return close(self.fd)
-
-
-class DecompressMemMapReader(object):
- """Reads data in chunks from a memory map and decompresses it. The client sees
- only the uncompressed data, respective file-like read calls are handling on-demand
- buffered decompression accordingly
-
- A constraint on the total size of bytes is activated, simulating
- a logical file within a possibly larger physical memory area
-
- To read efficiently, you clearly don't want to read individual bytes, instead,
- read a few kilobytes at least.
-
- :note: The chunk-size should be carefully selected as it will involve quite a bit
- of string copying due to the way the zlib is implemented. Its very wasteful,
- hence we try to find a good tradeoff between allocation time and number of
- times we actually allocate. An own zlib implementation would be good here
- to better support streamed reading - it would only need to keep the mmap
- and decompress it into chunks, thats all ... """
- __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
-
- max_read_size = 512*1024
-
- def __init__(self, m, close_on_deletion):
- """Initialize with mmap for stream reading"""
- self._m = m
- self._zip = zlib.decompressobj()
- self._buf = None # buffer of decompressed bytes
- self._buflen = 0 # length of bytes in buffer
- self._s = 0 # size of uncompressed data to read in total
- self._br = 0 # num uncompressed bytes read
- self._cws = 0 # start byte of compression window
- self._cwe = 0 # end byte of compression window
- self._close = close_on_deletion # close the memmap on deletion ?
-
- def __del__(self):
- if self._close:
- self._m.close()
- # END handle resource freeing
-
- def initialize(self, size=0):
- """Initialize this instance for acting as a read-only stream for size bytes.
- :param size: size in bytes to be decompresed before being depleted.
- If 0, default object header information is parsed from the data,
- returning a tuple of (type_string, uncompressed_size)
- If not 0, the size will be used, and None is returned.
- :note: must only be called exactly once"""
- if size:
- self._s = size
- return
- # END handle size
-
- # read header
- maxb = 512 # should really be enough, cgit uses 8192 I believe
- self._s = maxb
- hdr = self.read(maxb)
- hdrend = hdr.find("\0")
- type, size = hdr[:hdrend].split(" ")
- self._s = int(size)
-
- # adjust internal state to match actual header length that we ignore
- # The buffer will be depleted first on future reads
- self._br = 0
- hdrend += 1 # count terminating \0
- self._buf = StringIO(hdr[hdrend:])
- self._buflen = len(hdr) - hdrend
-
- return type, size
-
- def read(self, size=-1):
- if size < 1:
- size = self._s - self._br
- else:
- size = min(size, self._s - self._br)
- # END clamp size
-
- if size == 0:
- return str()
- # END handle depletion
-
- # protect from memory peaks
- # If he tries to read large chunks, our memory patterns get really bad
- # as we end up copying a possibly huge chunk from our memory map right into
- # memory. This might not even be possible. Nonetheless, try to dampen the
- # effect a bit by reading in chunks, returning a huge string in the end.
- # Our performance now depends on StringIO. This way we don't need two large
- # buffers in peak times, but only one large one in the end which is
- # the return buffer
- # NO: We don't do it - if the user thinks its best, he is right. If he
- # has trouble, he will start reading in chunks. According to our tests
- # its still faster if we read 10 Mb at once instead of chunking it.
-
- # if size > self.max_read_size:
- # sio = StringIO()
- # while size:
- # read_size = min(self.max_read_size, size)
- # data = self.read(read_size)
- # sio.write(data)
- # size -= len(data)
- # if len(data) < read_size:
- # break
- # # END data loop
- # sio.seek(0)
- # return sio.getvalue()
- # # END handle maxread
- #
- # deplete the buffer, then just continue using the decompress object
- # which has an own buffer. We just need this to transparently parse the
- # header from the zlib stream
- dat = str()
- if self._buf:
- if self._buflen >= size:
- # have enough data
- dat = self._buf.read(size)
- self._buflen -= size
- self._br += size
- return dat
- else:
- dat = self._buf.read() # ouch, duplicates data
- size -= self._buflen
- self._br += self._buflen
-
- self._buflen = 0
- self._buf = None
- # END handle buffer len
- # END handle buffer
-
- # decompress some data
- # Abstract: zlib needs to operate on chunks of our memory map ( which may
- # be large ), as it will otherwise and always fill in the 'unconsumed_tail'
- # attribute which possible reads our whole map to the end, forcing
- # everything to be read from disk even though just a portion was requested.
- # As this would be a nogo, we workaround it by passing only chunks of data,
- # moving the window into the memory map along as we decompress, which keeps
- # the tail smaller than our chunk-size. This causes 'only' the chunk to be
- # copied once, and another copy of a part of it when it creates the unconsumed
- # tail. We have to use it to hand in the appropriate amount of bytes durin g
- # the next read.
- tail = self._zip.unconsumed_tail
- if tail:
- # move the window, make it as large as size demands. For code-clarity,
- # we just take the chunk from our map again instead of reusing the unconsumed
- # tail. The latter one would safe some memory copying, but we could end up
- # with not getting enough data uncompressed, so we had to sort that out as well.
- # Now we just assume the worst case, hence the data is uncompressed and the window
- # needs to be as large as the uncompressed bytes we want to read.
- self._cws = self._cwe - len(tail)
- self._cwe = self._cws + size
-
-
- indata = self._m[self._cws:self._cwe] # another copy ... :(
- # get the actual window end to be sure we don't use it for computations
- self._cwe = self._cws + len(indata)
- else:
- cws = self._cws
- self._cws = self._cwe
- self._cwe = cws + size
- indata = self._m[self._cws:self._cwe] # ... copy it again :(
- # END handle tail
-
- dcompdat = self._zip.decompress(indata, size)
-
- self._br += len(dcompdat)
- if dat:
- dcompdat = dat + dcompdat
-
- return dcompdat
-
-#} END classes
diff --git a/test/git/performance/test_commit.py b/test/git/performance/test_commit.py
index 2398c93d..bca3ad8b 100644
--- a/test/git/performance/test_commit.py
+++ b/test/git/performance/test_commit.py
@@ -72,7 +72,7 @@ class TestPerformance(TestBigRepoRW):
assert_commit_serialization(self.gitrwrepo, self.head_sha_2k, True)
rwrepo = self.gitrwrepo
- make_object = rwrepo.odb.to_object
+ make_object = rwrepo.odb.store
# direct serialization - deserialization can be tested afterwards
# serialization is probably limited on IO
hc = rwrepo.commit(self.head_sha_2k)
diff --git a/test/git/performance/test_streams.py b/test/git/performance/test_streams.py
index d31bee14..30fd8048 100644
--- a/test/git/performance/test_streams.py
+++ b/test/git/performance/test_streams.py
@@ -51,7 +51,7 @@ class TestObjDBPerformance(TestBigRepoR):
# writing - due to the compression it will seem faster than it is
st = time()
- sha = ldb.to_object('blob', size, stream)
+ sha = ldb.store('blob', size, stream)
elapsed_add = time() - st
assert ldb.has_object(sha)
db_file = ldb.readable_db_object_path(sha)
@@ -63,7 +63,7 @@ class TestObjDBPerformance(TestBigRepoR):
# reading all at once
st = time()
- type, size, shastream = ldb.object(sha)
+ type, size, shastream = ldbstreamsha)
shadata = shastream.read()
elapsed_readall = time() - st
@@ -76,7 +76,7 @@ class TestObjDBPerformance(TestBigRepoR):
cs = 512*1000
chunks = list()
st = time()
- type, size, shastream = ldb.object(sha)
+ type, size, shastream = ldbstreamsha)
while True:
data = shastream.read(cs)
chunks.append(data)
diff --git a/test/git/test_commit.py b/test/git/test_commit.py
index a5f184e6..e914b9a7 100644
--- a/test/git/test_commit.py
+++ b/test/git/test_commit.py
@@ -31,7 +31,7 @@ def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False)
streamlen = stream.tell()
stream.seek(0)
- csha = rwrepo.odb.to_object(Commit.type, streamlen, stream)
+ csha = rwrepo.odb.store(Commit.type, streamlen, stream)
assert csha == cm.sha
nc = Commit(rwrepo, Commit.NULL_HEX_SHA, cm.tree.sha,
@@ -45,7 +45,7 @@ def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False)
ns += 1
streamlen = stream.tell()
stream.seek(0)
- nc.sha = rwrepo.odb.to_object(Commit.type, streamlen, stream)
+ nc.sha = rwrepo.odb.store(Commit.type, streamlen, stream)
# if it worked, we have exactly the same contents !
assert nc.sha == cm.sha
diff --git a/test/git/test_odb.py b/test/git/test_odb.py
index b2840719..80597df6 100644
--- a/test/git/test_odb.py
+++ b/test/git/test_odb.py
@@ -18,26 +18,26 @@ class TestDB(TestBase):
all_data = (two_lines, )
def _assert_object_writing(self, db):
- """General tests to verify object writing, compatible to iObjectDBW
+ """General tests to verify object writing, compatible to ObjectDBW
:note: requires write access to the database"""
# start in dry-run mode
for dry_run in range(1, -1, -1):
for data in self.all_data:
for hex_sha in range(2):
- sha = db.to_object(Blob.type, len(data), StringIO(data), dry_run, hex_sha)
+ sha = db.store(Blob.type, len(data), StringIO(data), dry_run, hex_sha)
assert db.has_object(sha) != dry_run
assert len(sha) == 20 + hex_sha * 20
# verify data - the slow way, we want to run code
if not dry_run:
- type, size = db.object_info(sha)
+ type, size = db.info(sha)
assert Blob.type == type
assert size == len(data)
- type, size, stream = db.object(sha)
+ type, size, stream = dbstreamsha)
assert stream.read() == data
else:
- self.failUnlessRaises(BadObject, db.object_info, sha)
+ self.failUnlessRaises(BadObject, db.info, sha)
self.failUnlessRaises(BadObject, db.object, sha)
# END for each sha type
# END for each data set