summaryrefslogtreecommitdiff
path: root/lib/git/odb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/odb')
-rw-r--r--lib/git/odb/db.py114
-rw-r--r--lib/git/odb/fun.py114
-rw-r--r--lib/git/odb/utils.py147
3 files changed, 322 insertions, 53 deletions
diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py
index 1248a3f4..5c50a512 100644
--- a/lib/git/odb/db.py
+++ b/lib/git/odb/db.py
@@ -1,17 +1,18 @@
"""Contains implementations of database retrieveing objects"""
-import os
+from git.utils import IndexFileSHA1Writer
from git.errors import (
InvalidDBRoot,
- BadObject
+ BadObject,
+ BadObjectType
)
-from git.utils import IndexFileSHA1Writer
from utils import (
- getsize,
+ DecompressMemMapReader,
+ FDCompressedSha1Writer,
+ ENOENT,
to_hex_sha,
exists,
hex_to_bin,
- FDCompressedSha1Writer,
isdir,
mkdir,
rename,
@@ -19,8 +20,15 @@ from utils import (
join
)
+from fun import (
+ chunk_size,
+ loose_object_header_info,
+ write_object
+ )
+
import tempfile
import mmap
+import os
class iObjectDBR(object):
@@ -36,7 +44,8 @@ class iObjectDBR(object):
def has_object(self, sha):
"""
:return: True if the object identified by the given 40 byte hexsha or 20 bytes
- binary sha is contained in the database"""
+ binary sha is contained in the database
+ :raise BadObject:"""
raise NotImplementedError("To be implemented in subclass")
def object(self, sha):
@@ -44,14 +53,16 @@ class iObjectDBR(object):
:return: tuple(type_string, size_in_bytes, stream) a tuple with object
information including its type, its size as well as a stream from which its
contents can be read
- :param sha: 40 bytes hexsha or 20 bytes binary sha """
+ :param sha: 40 bytes hexsha or 20 bytes binary sha
+ :raise BadObject:"""
raise NotImplementedError("To be implemented in subclass")
def object_info(self, sha):
"""
:return: tuple(type_string, size_in_bytes) tuple with the object's type
string as well as its size in bytes
- :param sha: 40 bytes hexsha or 20 bytes binary sha"""
+ :param sha: 40 bytes hexsha or 20 bytes binary sha
+ :raise BadObject:"""
raise NotImplementedError("To be implemented in subclass")
#} END query interface
@@ -70,7 +81,8 @@ class iObjectDBW(object):
:param stream: stream providing the data
:param dry_run: if True, the object database will not actually be changed
:param sha_as_hex: if True, the returned sha identifying the object will be
- hex encoded, not binary"""
+ hex encoded, not binary
+ :raise IOError: if data could not be written"""
raise NotImplementedError("To be implemented in subclass")
def to_objects(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0):
@@ -82,7 +94,8 @@ class iObjectDBW(object):
:param dry_run: see ``to_obj``
:param sha_as_hex: see ``to_obj``
:param max_threads: if < 1, any number of threads may be started while processing
- the request, otherwise the given number of threads will be started."""
+ the request, otherwise the given number of threads will be started.
+ :raise IOError: if data could not be written"""
# a trivial implementation, ignoring the threads for now
# TODO: add configuration to the class to determine whether we may
# actually use multiple threads, default False of course. If the add
@@ -130,15 +143,19 @@ class FileDBBase(object):
class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
"""A database which operates on loose object files"""
- __slots__ = ('_hexsha_to_file', )
-
+ __slots__ = ('_hexsha_to_file', '_fd_open_flags')
# CONFIGURATION
# chunks in which data will be copied between streams
- stream_chunk_size = 1000*1000
+ stream_chunk_size = chunk_size
+
def __init__(self, root_path):
super(LooseObjectDB, self).__init__(root_path)
self._hexsha_to_file = dict()
+ # Additional Flags - might be set to 0 after the first failure
+ # Depending on the root, this might work for some mounts, for others not, which
+ # is why it is per instance
+ self._fd_open_flags = os.O_NOATIME
#{ Interface
def object_path(self, hexsha):
@@ -167,36 +184,46 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
#} END interface
- def _object_header_info(self, mmap):
- """:return: tuple(type_string, uncompressed_size_in_bytes
- :param mmap: newly mapped memory map at position 0. It will be
- seeked to the actual start of the object contents, which can be used
- to initialize a zlib decompress object."""
- raise NotImplementedError("todo")
-
- def _map_object(self, sha):
+ def _map_loose_object(self, sha):
"""
- :return: tuple(file, mmap) tuple with an opened file for reading, and
- a memory map of that file"""
- db_path = self.readable_db_object_path(to_hex_sha(sha))
- f = open(db_path, 'rb')
- m = mmap.mmap(f.fileno(), getsize(db_path), access=mmap.ACCESS_READ)
- return (f, m)
+ :return: memory map of that file to allow random read access
+ :raise BadObject: if object could not be located"""
+ db_path = self.db_path(self.object_path(to_hex_sha(sha)))
+ try:
+ fd = os.open(db_path, os.O_RDONLY|self._fd_open_flags)
+ except OSError,e:
+ if e.errno != ENOENT:
+ # try again without noatime
+ try:
+ fd = os.open(db_path, os.O_RDONLY)
+ except OSError:
+ raise BadObject(to_hex_sha(sha))
+ # didn't work because of our flag, don't try it again
+ self._fd_open_flags = 0
+ else:
+ raise BadObject(to_hex_sha(sha))
+ # END handle error
+ # END exception handling
+ try:
+ return mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
+ finally:
+ os.close(fd)
+ # END assure file is closed
def object_info(self, sha):
- f, m = self._map_object(sha)
+ m = self._map_loose_object(sha)
try:
- type, size = self._object_header_info(m)
+ return loose_object_header_info(m)
finally:
- f.close()
m.close()
# END assure release of system resources
def object(self, sha):
- f, m = self._map_object(sha)
- type, size = self._object_header_info(m)
- # TODO: init a dynamic decompress stream from our memory map
+ m = self._map_loose_object(sha)
+ reader = DecompressMemMapReader(m, close_on_deletion = True)
+ type, size = reader.initialize()
+ return type, size, reader
def has_object(self, sha):
try:
@@ -210,25 +237,10 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
# open a tmp file to write the data to
fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path)
writer = FDCompressedSha1Writer(fd)
-
- # WRITE HEADER: type SP size NULL
- writer.write("%s %i%s" % (type, size, chr(0)))
-
- # WRITE ALL DATA
- chunksize = self.stream_chunk_size
+
try:
- try:
- while True:
- data_len = writer.write(stream.read(chunksize))
- if data_len < chunksize:
- # WRITE FOOTER
- writer.write('\n')
- break
- # END check for stream end
- # END duplicate data
- finally:
- writer.close()
- # END assure file was closed
+ write_object(type, size, stream, writer,
+ close_target_stream=True, chunk_size=self.stream_chunk_size)
except:
os.remove(tmp_path)
raise
diff --git a/lib/git/odb/fun.py b/lib/git/odb/fun.py
new file mode 100644
index 00000000..ee7144dd
--- /dev/null
+++ b/lib/git/odb/fun.py
@@ -0,0 +1,114 @@
+"""Contains basic c-functions which usually contain performance critical code
+Keeping this code separate from the beginning makes it easier to out-source
+it into c later, if required"""
+
+from git.errors import (
+ BadObjectType
+ )
+
+import zlib
+decompressobj = zlib.decompressobj
+
+
+# INVARIANTS
+type_id_to_type_map = {
+ 1 : "commit",
+ 2 : "tree",
+ 3 : "blob",
+ 4 : "tag"
+ }
+
+# used when dealing with larger streams
+chunk_size = 1000*1000
+
+
+#{ Routines
+
+def is_loose_object(m):
+ """:return: True the file contained in memory map m appears to be a loose object.
+ Only the first two bytes are needed"""
+ b0, b1 = map(ord, m[:2])
+ word = (b0 << 8) + b1
+ return b0 == 0x78 and (word % 31) == 0
+
+def loose_object_header_info(m):
+ """:return: tuple(type_string, uncompressed_size_in_bytes) the type string of the
+ object as well as its uncompressed size in bytes.
+ :param m: memory map from which to read the compressed object data"""
+ decompress_size = 8192 # is used in cgit as well
+ hdr = decompressobj().decompress(m, decompress_size)
+ type_name, size = hdr[:hdr.find("\0")].split(" ")
+ return type_name, int(size)
+
+def object_header_info(m):
+ """:return: tuple(type_string, uncompressed_size_in_bytes
+ :param mmap: mapped memory map. It will be
+ seeked to the actual start of the object contents, which can be used
+ to initialize a zlib decompress object.
+ :note: This routine can only handle new-style objects which are assumably contained
+ in packs
+ """
+ assert not is_loose_object(m), "Use loose_object_header_info instead"
+
+ c = b0 # first byte
+ i = 1 # next char to read
+ type_id = (c >> 4) & 7 # numeric type
+ size = c & 15 # starting size
+ s = 4 # starting bit-shift size
+ while c & 0x80:
+ c = ord(m[i])
+ i += 1
+ size += (c & 0x7f) << s
+ s += 7
+ # END character loop
+
+ # finally seek the map to the start of the data stream
+ m.seek(i)
+ try:
+ return (type_id_to_type_map[type_id], size)
+ except KeyError:
+ # invalid object type - we could try to be smart now and decode part
+ # of the stream to get the info, problem is that we had trouble finding
+ # the exact start of the content stream
+ raise BadObjectType(type_id)
+ # END handle exceptions
+
+def write_object(type, size, source_stream, target_stream, close_target_stream=True,
+ chunk_size=chunk_size):
+ """Write the object as identified by type, size and source_stream into the
+ target_stream
+
+ :param type: type string of the object
+ :param size: amount of bytes to write from source_stream
+ :param source_stream: stream as file-like object providing at least size bytes
+ :param target_stream: stream as file-like object to receive the data
+ :param close_target_stream: if True, the target stream will be closed when
+ the routine exits, even if an error is thrown
+ :param chunk_size: size of chunks to read from source. Larger values can be beneficial
+ for io performance, but cost more memory as well
+ :return: The actual amount of bytes written to stream, which includes the header and a trailing newline"""
+ tbw = 0 # total num bytes written
+ dbw = 0 # num data bytes written
+ try:
+ # WRITE HEADER: type SP size NULL
+ tbw += target_stream.write("%s %i\0" % (type, size))
+
+ # WRITE ALL DATA UP TO SIZE
+ while True:
+ cs = min(chunk_size, size-dbw)
+ data_len = target_stream.write(source_stream.read(cs))
+ dbw += data_len
+ if data_len < cs or dbw == size:
+ tbw += dbw
+ break
+ # END check for stream end
+ # END duplicate data
+ return tbw
+ finally:
+ if close_target_stream:
+ target_stream.close()
+ # END handle stream closing
+ # END assure file was closed
+
+
+#} END routines
diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py
index d88dca1a..8a054201 100644
--- a/lib/git/odb/utils.py
+++ b/lib/git/odb/utils.py
@@ -1,7 +1,10 @@
import binascii
import os
import zlib
+from cStringIO import StringIO
from git.utils import make_sha
+import errno
+from fun import chunk_size
__all__ = ('FDSha1Writer', )
@@ -21,8 +24,10 @@ def to_bin_sha(sha):
return sha
return hex_to_bin(sha)
+# errors
+ENOENT = errno.ENOENT
+
# os shortcuts
-getsize = os.path.getsize
exists = os.path.exists
mkdir = os.mkdir
isdir = os.path.isdir
@@ -32,6 +37,11 @@ join = os.path.join
read = os.read
write = os.write
close = os.close
+
+# ZLIB configuration
+# used when compressing objects
+Z_BEST_SPEED = 1
+
#} END Routines
@@ -50,7 +60,7 @@ class FDCompressedSha1Writer(object):
def __init__(self, fd):
self.fd = fd
self.sha1 = make_sha("")
- self.zip = zlib.compressobj()
+ self.zip = zlib.compressobj(Z_BEST_SPEED)
def write(self, data):
""":raise IOError: If not all bytes could be written
@@ -76,4 +86,137 @@ class FDCompressedSha1Writer(object):
return close(self.fd)
+class DecompressMemMapReader(object):
+ """Reads data in chunks from a memory map and decompresses it. The client sees
+ only the uncompressed data, respective file-like read calls are handling on-demand
+ buffered decompression accordingly
+
+ A constraint on the total size of bytes is activated, simulating
+ a logical file within a possibly larger physical memory area
+
+ To read efficiently, you clearly don't want to read individual bytes, instead,
+ read a few kilobytes at least.
+
+ :note: The chunk-size should be carefully selected as it will involve quite a bit
+ of string copying due to the way the zlib is implemented. Its very wasteful,
+ hence we try to find a good tradeoff between allocation time and number of
+ times we actually allocate. An own zlib implementation would be good here
+ to better support streamed reading - it would only need to keep the mmap
+ and decompress it into chunks, thats all ... """
+ __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_cs', '_close')
+
+ def __init__(self, m, close_on_deletion, cs = 128*1024):
+ """Initialize with mmap and chunk_size for stream reading"""
+ self._m = m
+ self._zip = zlib.decompressobj()
+ self._buf = None # buffer of decompressed bytes
+ self._buflen = 0 # length of bytes in buffer
+ self._s = 0 # size of uncompressed data to read in total
+ self._br = 0 # num uncompressed bytes read
+ self._cws = 0 # start byte of compression window
+ self._cwe = 0 # end byte of compression window
+ self._cs = cs # chunk size (when reading from zip)
+ self._close = close_on_deletion # close the memmap on deletion ?
+
+ def __del__(self):
+ if self._close:
+ self._m.close()
+ # END handle resource freeing
+
+ def initialize(self, size=0):
+ """Initialize this instance for acting as a read-only stream for size bytes.
+ :param size: size in bytes to be decompresed before being depleted.
+ If 0, default object header information is parsed from the data,
+ returning a tuple of (type_string, uncompressed_size)
+ If not 0, the size will be used, and None is returned.
+ :note: must only be called exactly once"""
+ if size:
+ self._s = size
+ return
+ # END handle size
+
+ # read header
+ maxb = 8192
+ self._s = maxb
+ hdr = self.read(maxb)
+ hdrend = hdr.find("\0")
+ type, size = hdr[:hdrend].split(" ")
+ self._s = int(size)
+
+ # adjust internal state to match actual header length that we ignore
+ # The buffer will be depleted first on future reads
+ self._br = 0
+ hdrend += 1 # count terminating \0
+ self._buf = StringIO(hdr[hdrend:])
+ self._buflen = len(hdr) - hdrend
+
+ return type, size
+
+ def read(self, size=-1):
+ if size < 1:
+ size = self._s - self._br
+ else:
+ size = min(size, self._s - self._br)
+ # END clamp size
+
+ if size == 0:
+ return str()
+ # END handle depletion
+
+ # deplete the buffer, then just continue using the decompress object
+ # which has an own buffer. We just need this to transparently parse the
+ # header from the zlib stream
+ dat = str()
+ if self._buf:
+ if self._buflen >= size:
+ # have enough data
+ dat = self._buf.read(size)
+ self._buflen -= size
+ self._br += size
+ return dat
+ else:
+ dat = self._buf.getvalue() # ouch, duplicates data
+ size -= self._buflen
+ self._br += self._buflen
+
+ self._buflen = 0
+ self._buf = None
+ # END handle buffer len
+ # END handle buffer
+
+ # decompress some data
+ # Abstract: zlib needs to operate on chunks of our memory map ( which may
+ # be large ), as it will otherwise and always fill in the 'unconsumed_tail'
+ # attribute which possible reads our whole map to the end, forcing
+ # everything to be read from disk even though just a portion was requested.
+ # As this would be a nogo, we workaround it by passing only chunks of data,
+ # moving the window into the memory map along as we decompress, which keeps
+ # the tail smaller than our chunk-size. This causes 'only' the chunk to be
+ # copied once, and another copy of a part of it when it creates the unconsumed
+ # tail. We have to use it to hand in the appropriate amount of bytes durin g
+ # the next read.
+ if self._zip.unconsumed_tail:
+ # move the window, make it as large as size demands. For code-clarity,
+ # we just take the chunk from our map again instead of reusing the unconsumed
+ # tail. The latter one would safe some memory copying, but we could end up
+ # with not getting enough data uncompressed, so we had to sort that out as well.
+ # Now we just assume the worst case, hence the data is uncompressed and the window
+ # needs to be as large as the uncompressed bytes we want to read.
+ self._cws = self._cwe - len(self._zip.unconsumed_tail)
+ self._cwe = self._cws + size
+ indata = self._m[self._cws:self._cwe] # another copy ... :(
+ else:
+ cws = self._cws
+ self._cws = self._cwe
+ self._cwe = cws + size
+ indata = self._m[self._cws:self._cwe] # ... copy it again :(
+ # END handle tail
+
+ dcompdat = self._zip.decompress(indata, size)
+ self._br += len(dcompdat)
+
+ if dat:
+ return dat + dcompdat
+ return dcompdat
+
#} END classes