From 282018b79cc8df078381097cb3aeb29ff56e83c6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 2 Jun 2010 20:11:00 +0200 Subject: Added first design and frame for object database. In a first step, loose objects will be written using our utilities, and certain object retrieval functionality moves into the GitObjectDatabase which is used by the repo instance Added performance test for object database access, which shows quite respectable tree parsing performance, and okay blob access. Nonetheless, it will be hard to beat the c performance using a pure python implementation, but it can be a nice practice to write it anyway to allow more direct pack manipulations. Some could benefit from the ability to write packs as these can serve as local cache if alternates are used --- lib/git/odb/__init__.py | 2 + lib/git/odb/db.py | 129 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 lib/git/odb/__init__.py create mode 100644 lib/git/odb/db.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/__init__.py b/lib/git/odb/__init__.py new file mode 100644 index 00000000..17000244 --- /dev/null +++ b/lib/git/odb/__init__.py @@ -0,0 +1,2 @@ +"""Initialize the object database module""" + diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py new file mode 100644 index 00000000..fd1b640a --- /dev/null +++ b/lib/git/odb/db.py @@ -0,0 +1,129 @@ +"""Contains implementations of database retrieveing objects""" +import os +from git.errors import InvalidDBRoot + + +class iObjectDBR(object): + """Defines an interface for object database lookup. + Objects are identified either by hex-sha (40 bytes) or + by sha (20 bytes)""" + __slots__ = tuple() + + #{ Query Interface + def has_obj_hex(self, hexsha): + """:return: True if the object identified by the given 40 byte hexsha is + contained in the database""" + raise NotImplementedError("To be implemented in subclass") + + def has_obj_bin(self, sha): + """:return: as ``has_obj_hex``, but takes a 20 byte binary sha""" + raise NotImplementedError("To be implemented in subclass") + + def obj_hex(self, hexsha): + """:return: tuple(type_string, size_in_bytes, stream) a tuple with object + information including its type, its size as well as a stream from which its + contents can be read""" + raise NotImplementedError("To be implemented in subclass") + + def obj_bin(self, sha): + """:return: as in ``obj_hex``, but takes a binary sha""" + raise NotImplementedError("To be implemented in subclass") + + def obj_info_hex(self, hexsha): + """:return: tuple(type_string, size_in_bytes) tuple with the object's type + string as well as its size in bytes""" + raise NotImplementedError("To be implemented in subclass") + + #} END query interface + +class iObjectDBW(object): + """Defines an interface to create objects in the database""" + __slots__ = tuple() + + #{ Edit Interface + + def to_obj(self, type, size, stream, dry_run=False, sha_as_hex=True): + """Create a new object in the database + :return: the sha identifying the object in the database + :param type: type string identifying the object + :param size: size of the data to read from stream + :param stream: stream providing the data + :param dry_run: if True, the object database will not actually be changed + :param sha_as_hex: if True, the returned sha identifying the object will be + hex encoded, not binary""" + raise NotImplementedError("To be implemented in subclass") + + def to_objs(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0): + """Create multiple new objects in the database + :return: sequence of shas identifying the created objects in the order in which + they where given. + :param iter_info: iterable yielding tuples containing the type_string + size_in_bytes and the steam with the content data. + :param dry_run: see ``to_obj`` + :param sha_as_hex: see ``to_obj`` + :param max_threads: if < 1, any number of threads may be started while processing + the request, otherwise the given number of threads will be started.""" + # a trivial implementation, ignoring the threads for now + # TODO: add configuration to the class to determine whether we may + # actually use multiple threads, default False of course. If the add + shas = list() + for args in iter_info: + shas.append(self.to_obj(*args, dry_run=dry_run, sha_as_hex=sha_as_hex)) + return shas + + #} END edit interface + + +class FileDBBase(object): + """Provides basic facilities to retrieve files of interest, including + caching facilities to help mapping hexsha's to objects""" + __slots__ = ('_root_path', ) + + def __init__(self, root_path): + """Initialize this instance to look for its files at the given root path + All subsequent operations will be relative to this path + :raise InvalidDBRoot: + :note: The base will perform basic checking for accessability, but the subclass + is required to verify that the root_path contains the database structure it needs""" + if not os.path.isdir(root_path): + raise InvalidDBRoot(root_path) + self._root_path = root_path + + + #{ Interface + def root_path(self): + """:return: path at which this db operates""" + return self._root_path + + #} END interface + + #{ Utiltities + def _root_rela_path(self, rela_path): + """:return: the given relative path relative to our database root""" + return os.path.join(self._root_path, rela_path) + + #} END utilities + + +class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): + """A database which operates on loose object files""" + + +class PackedDB(FileDBBase, iObjectDBR): + """A database operating on a set of object packs""" + + +class CompoundDB(iObjectDBR): + """A database which delegates calls to sub-databases""" + + +class ReferenceDB(CompoundDB): + """A database consisting of database referred to in a file""" + + +class GitObjectDB(CompoundDB, iObjectDBW): + """A database representing the default git object store, which includes loose + objects, pack files and an alternates file + + It will create objects only in the loose object database.""" + -- cgit v1.2.1 From 8b86f9b399a8f5af792a04025fdeefc02883f3e5 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 2 Jun 2010 22:40:52 +0200 Subject: initial version of loose object writing and simple cached object lookup appears to be working --- lib/git/odb/db.py | 148 ++++++++++++++++++++++++++++++++++++++++++--------- lib/git/odb/utils.py | 78 +++++++++++++++++++++++++++ 2 files changed, 202 insertions(+), 24 deletions(-) create mode 100644 lib/git/odb/utils.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index fd1b640a..204da9ad 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -1,6 +1,21 @@ """Contains implementations of database retrieveing objects""" import os from git.errors import InvalidDBRoot +from git.utils import IndexFileSHA1Writer + +from utils import ( + to_hex_sha, + exists, + hex_to_bin, + FDCompressedSha1Writer, + isdir, + mkdir, + rename, + dirname, + join + ) + +import tempfile class iObjectDBR(object): @@ -9,29 +24,29 @@ class iObjectDBR(object): by sha (20 bytes)""" __slots__ = tuple() + def __contains__(self, sha): + return self.has_obj + #{ Query Interface - def has_obj_hex(self, hexsha): - """:return: True if the object identified by the given 40 byte hexsha is - contained in the database""" - raise NotImplementedError("To be implemented in subclass") - - def has_obj_bin(self, sha): - """:return: as ``has_obj_hex``, but takes a 20 byte binary sha""" - raise NotImplementedError("To be implemented in subclass") - - def obj_hex(self, hexsha): - """:return: tuple(type_string, size_in_bytes, stream) a tuple with object - information including its type, its size as well as a stream from which its - contents can be read""" + def has_object(self, sha): + """ + :return: True if the object identified by the given 40 byte hexsha or 20 bytes + binary sha is contained in the database""" raise NotImplementedError("To be implemented in subclass") - def obj_bin(self, sha): - """:return: as in ``obj_hex``, but takes a binary sha""" + def object(self, sha): + """ + :return: tuple(type_string, size_in_bytes, stream) a tuple with object + information including its type, its size as well as a stream from which its + contents can be read + :param sha: 40 bytes hexsha or 20 bytes binary sha """ raise NotImplementedError("To be implemented in subclass") - def obj_info_hex(self, hexsha): - """:return: tuple(type_string, size_in_bytes) tuple with the object's type - string as well as its size in bytes""" + def object_info(self, sha): + """ + :return: tuple(type_string, size_in_bytes) tuple with the object's type + string as well as its size in bytes + :param sha: 40 bytes hexsha or 20 bytes binary sha""" raise NotImplementedError("To be implemented in subclass") #} END query interface @@ -42,7 +57,7 @@ class iObjectDBW(object): #{ Edit Interface - def to_obj(self, type, size, stream, dry_run=False, sha_as_hex=True): + def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True): """Create a new object in the database :return: the sha identifying the object in the database :param type: type string identifying the object @@ -53,7 +68,7 @@ class iObjectDBW(object): hex encoded, not binary""" raise NotImplementedError("To be implemented in subclass") - def to_objs(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0): + def to_objects(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0): """Create multiple new objects in the database :return: sequence of shas identifying the created objects in the order in which they where given. @@ -68,7 +83,7 @@ class iObjectDBW(object): # actually use multiple threads, default False of course. If the add shas = list() for args in iter_info: - shas.append(self.to_obj(*args, dry_run=dry_run, sha_as_hex=sha_as_hex)) + shas.append(self.to_object(*args, dry_run=dry_run, sha_as_hex=sha_as_hex)) return shas #} END edit interface @@ -95,18 +110,103 @@ class FileDBBase(object): """:return: path at which this db operates""" return self._root_path + def db_path(self, rela_path): + """ + :return: the given relative path relative to our database root, allowing + to pontentially access datafiles""" + return join(self._root_path, rela_path) #} END interface #{ Utiltities - def _root_rela_path(self, rela_path): - """:return: the given relative path relative to our database root""" - return os.path.join(self._root_path, rela_path) + #} END utilities class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): """A database which operates on loose object files""" + __slots__ = ('_hexsha_to_file', ) + + # CONFIGURATION + # chunks in which data will be copied between streams + stream_chunk_size = 1000*1000 + + def __init__(self, root_path): + super(LooseObjectDB, self).__init__(root_path) + self._hexsha_to_file = dict() + + #{ Interface + def hexsha_to_object_path(self, hexsha): + """ + :return: path at which the object with the given hexsha would be stored, + relative to the database root""" + return join(hexsha[:2], hexsha[2:]) + + #} END interface + + def has_object(self, sha): + sha = to_hex_sha(sha) + # try cache + if sha in self._hexsha_to_file: + return True + + # try filesystem + path = self.db_path(self.hexsha_to_object_path(sha)) + if exists(path): + self._hexsha_to_file[sha] = path + return True + # END handle cache + return False + + def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True): + # open a tmp file to write the data to + fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path) + writer = FDCompressedSha1Writer(fd) + + # WRITE HEADER: type SP size NULL + writer.write("%s %i%s" % (type, size, chr(0))) + + # WRITE ALL DATA + chunksize = self.stream_chunk_size + try: + try: + while True: + data_len = writer.write(stream.read(chunksize)) + if data_len < chunksize: + # WRITE FOOTER + writer.write('\n') + break + # END check for stream end + # END duplicate data + finally: + writer.close() + # END assure file was closed + except: + os.remove(tmp_path) + raise + # END assure tmpfile removal on error + + + # in dry-run mode, we delete the file afterwards + sha = writer.sha(as_hex=True) + + if dry_run: + os.remove(tmp_path) + else: + # rename the file into place + obj_path = self.db_path(self.hexsha_to_object_path(sha)) + obj_dir = dirname(obj_path) + if not isdir(obj_dir): + mkdir(obj_dir) + # END handle destination directory + rename(tmp_path, obj_path) + # END handle dry_run + + if not sha_as_hex: + sha = hex_to_bin(sha) + # END handle sha format + + return sha class PackedDB(FileDBBase, iObjectDBR): diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py new file mode 100644 index 00000000..04d3eaba --- /dev/null +++ b/lib/git/odb/utils.py @@ -0,0 +1,78 @@ +import binascii +import os +import zlib +from git.utils import make_sha + +__all__ = ('FDSha1Writer', ) + +#{ Routines + +hex_to_bin = binascii.a2b_hex +bin_to_hex = binascii.b2a_hex + +def to_hex_sha(sha): + """:return: hexified version of sha""" + if len(sha) == 40: + return sha + return bin_to_hex(sha) + +def to_bin_sha(sha): + if len(sha) == 20: + return sha + return hex_to_bin(sha) + +# os shortcuts +exists = os.path.exists +mkdir = os.mkdir +isdir = os.path.isdir +rename = os.rename +dirname = os.path.dirname +join = os.path.join +read = os.read +write = os.write +close = os.close +#} END Routines + + +#{ Classes + +class FDCompressedSha1Writer(object): + """Digests data written to it, making the sha available, then compress the + data and write it to the file descriptor + :note: operates on raw file descriptors + :note: for this to work, you have to use the close-method of this instance""" + __slots__ = ("fd", "sha1", "zip") + + # default exception + exc = IOError("Failed to write all bytes to filedescriptor") + + def __init__(self, fd): + self.fd = fd + self.sha1 = make_sha("") + self.zip = zlib.compressobj() + + def write(self, data): + """:raise IOError: If not all bytes could be written + :return: lenght of incoming data""" + self.sha1.update(data) + cdata = self.zip.compress(data) + bytes_written = write(self.fd, cdata) + if bytes_written != len(cdata): + raise self.exc + return bytes_written + + def sha(self, as_hex = False): + """:return: sha so far + :param as_hex: if True, sha will be hex-encoded, binary otherwise""" + if as_hex: + return self.sha1.hexdigest() + return self.sha1.digest() + + def close(self): + remainder = self.zip.flush() + if write(self.fd, remainder) != len(remainder): + raise self.exc + return close(self.fd) + + +#} END classes -- cgit v1.2.1 From 6f8ce8901e21587cd2320562df412e05b5ab1731 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 2 Jun 2010 23:53:29 +0200 Subject: added frame for object reading, including simple test --- lib/git/odb/db.py | 76 ++++++++++++++++++++++++++++++++++++++++++---------- lib/git/odb/utils.py | 1 + 2 files changed, 63 insertions(+), 14 deletions(-) (limited to 'lib/git/odb') diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index 204da9ad..1248a3f4 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -1,9 +1,13 @@ """Contains implementations of database retrieveing objects""" import os -from git.errors import InvalidDBRoot +from git.errors import ( + InvalidDBRoot, + BadObject + ) from git.utils import IndexFileSHA1Writer from utils import ( + getsize, to_hex_sha, exists, hex_to_bin, @@ -16,6 +20,7 @@ from utils import ( ) import tempfile +import mmap class iObjectDBR(object): @@ -136,27 +141,70 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): self._hexsha_to_file = dict() #{ Interface - def hexsha_to_object_path(self, hexsha): + def object_path(self, hexsha): """ :return: path at which the object with the given hexsha would be stored, relative to the database root""" return join(hexsha[:2], hexsha[2:]) - #} END interface - - def has_object(self, sha): - sha = to_hex_sha(sha) - # try cache - if sha in self._hexsha_to_file: - return True + def readable_db_object_path(self, hexsha): + """ + :return: readable object path to the object identified by hexsha + :raise BadObject: If the object file does not exist""" + try: + return self._hexsha_to_file[hexsha] + except KeyError: + pass + # END ignore cache misses # try filesystem - path = self.db_path(self.hexsha_to_object_path(sha)) + path = self.db_path(self.object_path(hexsha)) if exists(path): - self._hexsha_to_file[sha] = path - return True + self._hexsha_to_file[hexsha] = path + return path # END handle cache - return False + raise BadObject(hexsha) + + #} END interface + + def _object_header_info(self, mmap): + """:return: tuple(type_string, uncompressed_size_in_bytes + :param mmap: newly mapped memory map at position 0. It will be + seeked to the actual start of the object contents, which can be used + to initialize a zlib decompress object.""" + raise NotImplementedError("todo") + + def _map_object(self, sha): + """ + :return: tuple(file, mmap) tuple with an opened file for reading, and + a memory map of that file""" + db_path = self.readable_db_object_path(to_hex_sha(sha)) + f = open(db_path, 'rb') + m = mmap.mmap(f.fileno(), getsize(db_path), access=mmap.ACCESS_READ) + return (f, m) + + def object_info(self, sha): + f, m = self._map_object(sha) + try: + type, size = self._object_header_info(m) + finally: + f.close() + m.close() + # END assure release of system resources + + def object(self, sha): + f, m = self._map_object(sha) + type, size = self._object_header_info(m) + # TODO: init a dynamic decompress stream from our memory map + + + def has_object(self, sha): + try: + self.readable_db_object_path(to_hex_sha(sha)) + return True + except BadObject: + return False + # END check existance def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True): # open a tmp file to write the data to @@ -194,7 +242,7 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): os.remove(tmp_path) else: # rename the file into place - obj_path = self.db_path(self.hexsha_to_object_path(sha)) + obj_path = self.db_path(self.object_path(sha)) obj_dir = dirname(obj_path) if not isdir(obj_dir): mkdir(obj_dir) diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index 04d3eaba..d88dca1a 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -22,6 +22,7 @@ def to_bin_sha(sha): return hex_to_bin(sha) # os shortcuts +getsize = os.path.getsize exists = os.path.exists mkdir = os.mkdir isdir = os.path.isdir -- cgit v1.2.1 From 38d59fc8ccccae8882fa48671377bf40a27915a7 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 3 Jun 2010 16:35:35 +0200 Subject: odb: implemented loose object streaming, which is impossible to do efficiently considering that it copies string buffers all the time --- lib/git/odb/db.py | 114 +++++++++++++++++++++------------------ lib/git/odb/fun.py | 114 +++++++++++++++++++++++++++++++++++++++ lib/git/odb/utils.py | 147 ++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 322 insertions(+), 53 deletions(-) create mode 100644 lib/git/odb/fun.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index 1248a3f4..5c50a512 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -1,17 +1,18 @@ """Contains implementations of database retrieveing objects""" -import os +from git.utils import IndexFileSHA1Writer from git.errors import ( InvalidDBRoot, - BadObject + BadObject, + BadObjectType ) -from git.utils import IndexFileSHA1Writer from utils import ( - getsize, + DecompressMemMapReader, + FDCompressedSha1Writer, + ENOENT, to_hex_sha, exists, hex_to_bin, - FDCompressedSha1Writer, isdir, mkdir, rename, @@ -19,8 +20,15 @@ from utils import ( join ) +from fun import ( + chunk_size, + loose_object_header_info, + write_object + ) + import tempfile import mmap +import os class iObjectDBR(object): @@ -36,7 +44,8 @@ class iObjectDBR(object): def has_object(self, sha): """ :return: True if the object identified by the given 40 byte hexsha or 20 bytes - binary sha is contained in the database""" + binary sha is contained in the database + :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") def object(self, sha): @@ -44,14 +53,16 @@ class iObjectDBR(object): :return: tuple(type_string, size_in_bytes, stream) a tuple with object information including its type, its size as well as a stream from which its contents can be read - :param sha: 40 bytes hexsha or 20 bytes binary sha """ + :param sha: 40 bytes hexsha or 20 bytes binary sha + :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") def object_info(self, sha): """ :return: tuple(type_string, size_in_bytes) tuple with the object's type string as well as its size in bytes - :param sha: 40 bytes hexsha or 20 bytes binary sha""" + :param sha: 40 bytes hexsha or 20 bytes binary sha + :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") #} END query interface @@ -70,7 +81,8 @@ class iObjectDBW(object): :param stream: stream providing the data :param dry_run: if True, the object database will not actually be changed :param sha_as_hex: if True, the returned sha identifying the object will be - hex encoded, not binary""" + hex encoded, not binary + :raise IOError: if data could not be written""" raise NotImplementedError("To be implemented in subclass") def to_objects(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0): @@ -82,7 +94,8 @@ class iObjectDBW(object): :param dry_run: see ``to_obj`` :param sha_as_hex: see ``to_obj`` :param max_threads: if < 1, any number of threads may be started while processing - the request, otherwise the given number of threads will be started.""" + the request, otherwise the given number of threads will be started. + :raise IOError: if data could not be written""" # a trivial implementation, ignoring the threads for now # TODO: add configuration to the class to determine whether we may # actually use multiple threads, default False of course. If the add @@ -130,15 +143,19 @@ class FileDBBase(object): class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): """A database which operates on loose object files""" - __slots__ = ('_hexsha_to_file', ) - + __slots__ = ('_hexsha_to_file', '_fd_open_flags') # CONFIGURATION # chunks in which data will be copied between streams - stream_chunk_size = 1000*1000 + stream_chunk_size = chunk_size + def __init__(self, root_path): super(LooseObjectDB, self).__init__(root_path) self._hexsha_to_file = dict() + # Additional Flags - might be set to 0 after the first failure + # Depending on the root, this might work for some mounts, for others not, which + # is why it is per instance + self._fd_open_flags = os.O_NOATIME #{ Interface def object_path(self, hexsha): @@ -167,36 +184,46 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): #} END interface - def _object_header_info(self, mmap): - """:return: tuple(type_string, uncompressed_size_in_bytes - :param mmap: newly mapped memory map at position 0. It will be - seeked to the actual start of the object contents, which can be used - to initialize a zlib decompress object.""" - raise NotImplementedError("todo") - - def _map_object(self, sha): + def _map_loose_object(self, sha): """ - :return: tuple(file, mmap) tuple with an opened file for reading, and - a memory map of that file""" - db_path = self.readable_db_object_path(to_hex_sha(sha)) - f = open(db_path, 'rb') - m = mmap.mmap(f.fileno(), getsize(db_path), access=mmap.ACCESS_READ) - return (f, m) + :return: memory map of that file to allow random read access + :raise BadObject: if object could not be located""" + db_path = self.db_path(self.object_path(to_hex_sha(sha))) + try: + fd = os.open(db_path, os.O_RDONLY|self._fd_open_flags) + except OSError,e: + if e.errno != ENOENT: + # try again without noatime + try: + fd = os.open(db_path, os.O_RDONLY) + except OSError: + raise BadObject(to_hex_sha(sha)) + # didn't work because of our flag, don't try it again + self._fd_open_flags = 0 + else: + raise BadObject(to_hex_sha(sha)) + # END handle error + # END exception handling + try: + return mmap.mmap(fd, 0, access=mmap.ACCESS_READ) + finally: + os.close(fd) + # END assure file is closed def object_info(self, sha): - f, m = self._map_object(sha) + m = self._map_loose_object(sha) try: - type, size = self._object_header_info(m) + return loose_object_header_info(m) finally: - f.close() m.close() # END assure release of system resources def object(self, sha): - f, m = self._map_object(sha) - type, size = self._object_header_info(m) - # TODO: init a dynamic decompress stream from our memory map + m = self._map_loose_object(sha) + reader = DecompressMemMapReader(m, close_on_deletion = True) + type, size = reader.initialize() + return type, size, reader def has_object(self, sha): try: @@ -210,25 +237,10 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): # open a tmp file to write the data to fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path) writer = FDCompressedSha1Writer(fd) - - # WRITE HEADER: type SP size NULL - writer.write("%s %i%s" % (type, size, chr(0))) - - # WRITE ALL DATA - chunksize = self.stream_chunk_size + try: - try: - while True: - data_len = writer.write(stream.read(chunksize)) - if data_len < chunksize: - # WRITE FOOTER - writer.write('\n') - break - # END check for stream end - # END duplicate data - finally: - writer.close() - # END assure file was closed + write_object(type, size, stream, writer, + close_target_stream=True, chunk_size=self.stream_chunk_size) except: os.remove(tmp_path) raise diff --git a/lib/git/odb/fun.py b/lib/git/odb/fun.py new file mode 100644 index 00000000..ee7144dd --- /dev/null +++ b/lib/git/odb/fun.py @@ -0,0 +1,114 @@ +"""Contains basic c-functions which usually contain performance critical code +Keeping this code separate from the beginning makes it easier to out-source +it into c later, if required""" + +from git.errors import ( + BadObjectType + ) + +import zlib +decompressobj = zlib.decompressobj + + +# INVARIANTS +type_id_to_type_map = { + 1 : "commit", + 2 : "tree", + 3 : "blob", + 4 : "tag" + } + +# used when dealing with larger streams +chunk_size = 1000*1000 + + +#{ Routines + +def is_loose_object(m): + """:return: True the file contained in memory map m appears to be a loose object. + Only the first two bytes are needed""" + b0, b1 = map(ord, m[:2]) + word = (b0 << 8) + b1 + return b0 == 0x78 and (word % 31) == 0 + +def loose_object_header_info(m): + """:return: tuple(type_string, uncompressed_size_in_bytes) the type string of the + object as well as its uncompressed size in bytes. + :param m: memory map from which to read the compressed object data""" + decompress_size = 8192 # is used in cgit as well + hdr = decompressobj().decompress(m, decompress_size) + type_name, size = hdr[:hdr.find("\0")].split(" ") + return type_name, int(size) + +def object_header_info(m): + """:return: tuple(type_string, uncompressed_size_in_bytes + :param mmap: mapped memory map. It will be + seeked to the actual start of the object contents, which can be used + to initialize a zlib decompress object. + :note: This routine can only handle new-style objects which are assumably contained + in packs + """ + assert not is_loose_object(m), "Use loose_object_header_info instead" + + c = b0 # first byte + i = 1 # next char to read + type_id = (c >> 4) & 7 # numeric type + size = c & 15 # starting size + s = 4 # starting bit-shift size + while c & 0x80: + c = ord(m[i]) + i += 1 + size += (c & 0x7f) << s + s += 7 + # END character loop + + # finally seek the map to the start of the data stream + m.seek(i) + try: + return (type_id_to_type_map[type_id], size) + except KeyError: + # invalid object type - we could try to be smart now and decode part + # of the stream to get the info, problem is that we had trouble finding + # the exact start of the content stream + raise BadObjectType(type_id) + # END handle exceptions + +def write_object(type, size, source_stream, target_stream, close_target_stream=True, + chunk_size=chunk_size): + """Write the object as identified by type, size and source_stream into the + target_stream + + :param type: type string of the object + :param size: amount of bytes to write from source_stream + :param source_stream: stream as file-like object providing at least size bytes + :param target_stream: stream as file-like object to receive the data + :param close_target_stream: if True, the target stream will be closed when + the routine exits, even if an error is thrown + :param chunk_size: size of chunks to read from source. Larger values can be beneficial + for io performance, but cost more memory as well + :return: The actual amount of bytes written to stream, which includes the header and a trailing newline""" + tbw = 0 # total num bytes written + dbw = 0 # num data bytes written + try: + # WRITE HEADER: type SP size NULL + tbw += target_stream.write("%s %i\0" % (type, size)) + + # WRITE ALL DATA UP TO SIZE + while True: + cs = min(chunk_size, size-dbw) + data_len = target_stream.write(source_stream.read(cs)) + dbw += data_len + if data_len < cs or dbw == size: + tbw += dbw + break + # END check for stream end + # END duplicate data + return tbw + finally: + if close_target_stream: + target_stream.close() + # END handle stream closing + # END assure file was closed + + +#} END routines diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index d88dca1a..8a054201 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -1,7 +1,10 @@ import binascii import os import zlib +from cStringIO import StringIO from git.utils import make_sha +import errno +from fun import chunk_size __all__ = ('FDSha1Writer', ) @@ -21,8 +24,10 @@ def to_bin_sha(sha): return sha return hex_to_bin(sha) +# errors +ENOENT = errno.ENOENT + # os shortcuts -getsize = os.path.getsize exists = os.path.exists mkdir = os.mkdir isdir = os.path.isdir @@ -32,6 +37,11 @@ join = os.path.join read = os.read write = os.write close = os.close + +# ZLIB configuration +# used when compressing objects +Z_BEST_SPEED = 1 + #} END Routines @@ -50,7 +60,7 @@ class FDCompressedSha1Writer(object): def __init__(self, fd): self.fd = fd self.sha1 = make_sha("") - self.zip = zlib.compressobj() + self.zip = zlib.compressobj(Z_BEST_SPEED) def write(self, data): """:raise IOError: If not all bytes could be written @@ -76,4 +86,137 @@ class FDCompressedSha1Writer(object): return close(self.fd) +class DecompressMemMapReader(object): + """Reads data in chunks from a memory map and decompresses it. The client sees + only the uncompressed data, respective file-like read calls are handling on-demand + buffered decompression accordingly + + A constraint on the total size of bytes is activated, simulating + a logical file within a possibly larger physical memory area + + To read efficiently, you clearly don't want to read individual bytes, instead, + read a few kilobytes at least. + + :note: The chunk-size should be carefully selected as it will involve quite a bit + of string copying due to the way the zlib is implemented. Its very wasteful, + hence we try to find a good tradeoff between allocation time and number of + times we actually allocate. An own zlib implementation would be good here + to better support streamed reading - it would only need to keep the mmap + and decompress it into chunks, thats all ... """ + __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_cs', '_close') + + def __init__(self, m, close_on_deletion, cs = 128*1024): + """Initialize with mmap and chunk_size for stream reading""" + self._m = m + self._zip = zlib.decompressobj() + self._buf = None # buffer of decompressed bytes + self._buflen = 0 # length of bytes in buffer + self._s = 0 # size of uncompressed data to read in total + self._br = 0 # num uncompressed bytes read + self._cws = 0 # start byte of compression window + self._cwe = 0 # end byte of compression window + self._cs = cs # chunk size (when reading from zip) + self._close = close_on_deletion # close the memmap on deletion ? + + def __del__(self): + if self._close: + self._m.close() + # END handle resource freeing + + def initialize(self, size=0): + """Initialize this instance for acting as a read-only stream for size bytes. + :param size: size in bytes to be decompresed before being depleted. + If 0, default object header information is parsed from the data, + returning a tuple of (type_string, uncompressed_size) + If not 0, the size will be used, and None is returned. + :note: must only be called exactly once""" + if size: + self._s = size + return + # END handle size + + # read header + maxb = 8192 + self._s = maxb + hdr = self.read(maxb) + hdrend = hdr.find("\0") + type, size = hdr[:hdrend].split(" ") + self._s = int(size) + + # adjust internal state to match actual header length that we ignore + # The buffer will be depleted first on future reads + self._br = 0 + hdrend += 1 # count terminating \0 + self._buf = StringIO(hdr[hdrend:]) + self._buflen = len(hdr) - hdrend + + return type, size + + def read(self, size=-1): + if size < 1: + size = self._s - self._br + else: + size = min(size, self._s - self._br) + # END clamp size + + if size == 0: + return str() + # END handle depletion + + # deplete the buffer, then just continue using the decompress object + # which has an own buffer. We just need this to transparently parse the + # header from the zlib stream + dat = str() + if self._buf: + if self._buflen >= size: + # have enough data + dat = self._buf.read(size) + self._buflen -= size + self._br += size + return dat + else: + dat = self._buf.getvalue() # ouch, duplicates data + size -= self._buflen + self._br += self._buflen + + self._buflen = 0 + self._buf = None + # END handle buffer len + # END handle buffer + + # decompress some data + # Abstract: zlib needs to operate on chunks of our memory map ( which may + # be large ), as it will otherwise and always fill in the 'unconsumed_tail' + # attribute which possible reads our whole map to the end, forcing + # everything to be read from disk even though just a portion was requested. + # As this would be a nogo, we workaround it by passing only chunks of data, + # moving the window into the memory map along as we decompress, which keeps + # the tail smaller than our chunk-size. This causes 'only' the chunk to be + # copied once, and another copy of a part of it when it creates the unconsumed + # tail. We have to use it to hand in the appropriate amount of bytes durin g + # the next read. + if self._zip.unconsumed_tail: + # move the window, make it as large as size demands. For code-clarity, + # we just take the chunk from our map again instead of reusing the unconsumed + # tail. The latter one would safe some memory copying, but we could end up + # with not getting enough data uncompressed, so we had to sort that out as well. + # Now we just assume the worst case, hence the data is uncompressed and the window + # needs to be as large as the uncompressed bytes we want to read. + self._cws = self._cwe - len(self._zip.unconsumed_tail) + self._cwe = self._cws + size + indata = self._m[self._cws:self._cwe] # another copy ... :( + else: + cws = self._cws + self._cws = self._cwe + self._cwe = cws + size + indata = self._m[self._cws:self._cwe] # ... copy it again :( + # END handle tail + + dcompdat = self._zip.decompress(indata, size) + self._br += len(dcompdat) + + if dat: + return dat + dcompdat + return dcompdat + #} END classes -- cgit v1.2.1 From 26e138cb47dccc859ff219f108ce9b7d96cbcbcd Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 3 Jun 2010 18:21:05 +0200 Subject: odb: fixed streamed decompression reader ( specific tests would still be missing ) and added performance tests which are extremely promising --- lib/git/odb/db.py | 4 ++-- lib/git/odb/utils.py | 22 ++++++++++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) (limited to 'lib/git/odb') diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index 5c50a512..e656b2b5 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -91,8 +91,8 @@ class iObjectDBW(object): they where given. :param iter_info: iterable yielding tuples containing the type_string size_in_bytes and the steam with the content data. - :param dry_run: see ``to_obj`` - :param sha_as_hex: see ``to_obj`` + :param dry_run: see ``to_object`` + :param sha_as_hex: see ``to_object`` :param max_threads: if < 1, any number of threads may be started while processing the request, otherwise the given number of threads will be started. :raise IOError: if data could not be written""" diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index 8a054201..1e4a8e9d 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -39,7 +39,7 @@ write = os.write close = os.close # ZLIB configuration -# used when compressing objects +# used when compressing objects - 1 to 9 ( slowest ) Z_BEST_SPEED = 1 #} END Routines @@ -70,7 +70,7 @@ class FDCompressedSha1Writer(object): bytes_written = write(self.fd, cdata) if bytes_written != len(cdata): raise self.exc - return bytes_written + return len(data) def sha(self, as_hex = False): """:return: sha so far @@ -175,7 +175,7 @@ class DecompressMemMapReader(object): self._br += size return dat else: - dat = self._buf.getvalue() # ouch, duplicates data + dat = self._buf.read() # ouch, duplicates data size -= self._buflen self._br += self._buflen @@ -195,28 +195,34 @@ class DecompressMemMapReader(object): # copied once, and another copy of a part of it when it creates the unconsumed # tail. We have to use it to hand in the appropriate amount of bytes durin g # the next read. - if self._zip.unconsumed_tail: + tail = self._zip.unconsumed_tail + if tail: # move the window, make it as large as size demands. For code-clarity, # we just take the chunk from our map again instead of reusing the unconsumed # tail. The latter one would safe some memory copying, but we could end up # with not getting enough data uncompressed, so we had to sort that out as well. # Now we just assume the worst case, hence the data is uncompressed and the window # needs to be as large as the uncompressed bytes we want to read. - self._cws = self._cwe - len(self._zip.unconsumed_tail) + self._cws = self._cwe - len(tail) self._cwe = self._cws + size + + indata = self._m[self._cws:self._cwe] # another copy ... :( + # get the actual window end to be sure we don't use it for computations + self._cwe = self._cws + len(indata) else: cws = self._cws self._cws = self._cwe self._cwe = cws + size indata = self._m[self._cws:self._cwe] # ... copy it again :( # END handle tail - + dcompdat = self._zip.decompress(indata, size) - self._br += len(dcompdat) + self._br += len(dcompdat) if dat: - return dat + dcompdat + dcompdat = dat + dcompdat + return dcompdat #} END classes -- cgit v1.2.1 From 4b4a514e51fbc7dc6ddcb27c188159d57b5d1fa9 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 3 Jun 2010 19:04:18 +0200 Subject: Added performance comparison to cgit ... and yes, git-python is faster :) --- lib/git/odb/utils.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) (limited to 'lib/git/odb') diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index 1e4a8e9d..94d1cea8 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -103,10 +103,12 @@ class DecompressMemMapReader(object): times we actually allocate. An own zlib implementation would be good here to better support streamed reading - it would only need to keep the mmap and decompress it into chunks, thats all ... """ - __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_cs', '_close') + __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') - def __init__(self, m, close_on_deletion, cs = 128*1024): - """Initialize with mmap and chunk_size for stream reading""" + max_read_size = 512*1024 + + def __init__(self, m, close_on_deletion): + """Initialize with mmap for stream reading""" self._m = m self._zip = zlib.decompressobj() self._buf = None # buffer of decompressed bytes @@ -115,7 +117,6 @@ class DecompressMemMapReader(object): self._br = 0 # num uncompressed bytes read self._cws = 0 # start byte of compression window self._cwe = 0 # end byte of compression window - self._cs = cs # chunk size (when reading from zip) self._close = close_on_deletion # close the memmap on deletion ? def __del__(self): @@ -163,6 +164,28 @@ class DecompressMemMapReader(object): return str() # END handle depletion + # protect from memory peaks + # If he tries to read large chunks, our memory patterns get really bad + # as we end up copying a possibly huge chunk from our memory map right into + # memory. This might not even be possible. Nonetheless, try to dampen the + # effect a bit by reading in chunks, returning a huge string in the end. + # Our performance now depends on StringIO. This way we don't need two large + # buffers in peak times, but only one large one in the end which is + # the return buffer + if size > self.max_read_size: + sio = StringIO() + while size: + read_size = min(self.max_read_size, size) + data = self.read(read_size) + sio.write(data) + size -= len(data) + if len(data) < read_size: + break + # END data loop + sio.seek(0) + return sio.getvalue() + # END handle maxread + # deplete the buffer, then just continue using the decompress object # which has an own buffer. We just need this to transparently parse the # header from the zlib stream -- cgit v1.2.1 From 1e2b46138ba58033738a24dadccc265748fce2ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 3 Jun 2010 23:20:34 +0200 Subject: commit.create_from_tree now uses pure python implementation, fixed message parsing which truncated newlines although it was ilegitimate. Its up to the reader to truncate therse, nowhere in the git code I could find anyone adding newlines to commits where it is written Added performance tests for serialization, it does about 5k commits per second if writing to tmpfs --- lib/git/odb/utils.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) (limited to 'lib/git/odb') diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index 94d1cea8..fd340962 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -137,7 +137,7 @@ class DecompressMemMapReader(object): # END handle size # read header - maxb = 8192 + maxb = 512 # should really be enough, cgit uses 8192 I believe self._s = maxb hdr = self.read(maxb) hdrend = hdr.find("\0") @@ -172,20 +172,24 @@ class DecompressMemMapReader(object): # Our performance now depends on StringIO. This way we don't need two large # buffers in peak times, but only one large one in the end which is # the return buffer - if size > self.max_read_size: - sio = StringIO() - while size: - read_size = min(self.max_read_size, size) - data = self.read(read_size) - sio.write(data) - size -= len(data) - if len(data) < read_size: - break - # END data loop - sio.seek(0) - return sio.getvalue() - # END handle maxread + # NO: We don't do it - if the user thinks its best, he is right. If he + # has trouble, he will start reading in chunks. According to our tests + # its still faster if we read 10 Mb at once instead of chunking it. + # if size > self.max_read_size: + # sio = StringIO() + # while size: + # read_size = min(self.max_read_size, size) + # data = self.read(read_size) + # sio.write(data) + # size -= len(data) + # if len(data) < read_size: + # break + # # END data loop + # sio.seek(0) + # return sio.getvalue() + # # END handle maxread + # # deplete the buffer, then just continue using the decompress object # which has an own buffer. We just need this to transparently parse the # header from the zlib stream -- cgit v1.2.1 From 1906ee4df9ae4e734288c5203cf79894dff76cab Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 3 Jun 2010 23:27:09 +0200 Subject: Fixed compatability issues with python 2.5, made sure all tests run --- lib/git/odb/db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/git/odb') diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index e656b2b5..c970410d 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -101,7 +101,7 @@ class iObjectDBW(object): # actually use multiple threads, default False of course. If the add shas = list() for args in iter_info: - shas.append(self.to_object(*args, dry_run=dry_run, sha_as_hex=sha_as_hex)) + shas.append(self.to_object(dry_run=dry_run, sha_as_hex=sha_as_hex, *args)) return shas #} END edit interface @@ -155,7 +155,7 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): # Additional Flags - might be set to 0 after the first failure # Depending on the root, this might work for some mounts, for others not, which # is why it is per instance - self._fd_open_flags = os.O_NOATIME + self._fd_open_flags = getattr(os, 'O_NOATIME', 0) #{ Interface def object_path(self, hexsha): -- cgit v1.2.1 From b01ca6a3e4ae9d944d799743c8ff774e2a7a82b6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 4 Jun 2010 00:09:00 +0200 Subject: db: implemented GitObjectDB using the git command to make sure we can lookup everything. Next is to implement pack-file reading, then alternates which should allow to resolve everything --- lib/git/odb/db.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) (limited to 'lib/git/odb') diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index c970410d..1d1d4c40 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -281,9 +281,27 @@ class ReferenceDB(CompoundDB): """A database consisting of database referred to in a file""" -class GitObjectDB(CompoundDB, iObjectDBW): +#class GitObjectDB(CompoundDB, iObjectDBW): +class GitObjectDB(LooseObjectDB): """A database representing the default git object store, which includes loose objects, pack files and an alternates file - It will create objects only in the loose object database.""" + It will create objects only in the loose object database. + :note: for now, we use the git command to do all the lookup, just until he + have packs and the other implementations + """ + __slots__ = ('_git', ) + def __init__(self, root_path, git): + """Initialize this instance with the root and a git command""" + super(GitObjectDB, self).__init__(root_path) + self._git = git + + def object_info(self, sha): + discard, type, size = self._git.get_object_header(sha) + return type, size + + def object(self, sha): + """For now, all lookup is done by git itself""" + discard, type, size, stream = self._git.stream_object_data(sha) + return type, size, stream -- cgit v1.2.1 From a1e80445ad5cb6da4c0070d7cb8af89da3b0803b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 4 Jun 2010 14:41:15 +0200 Subject: initial version of new odb design to facilitate a channel based multi-threading implementation of all odb functions --- lib/git/odb/db.py | 114 +++++++++------ lib/git/odb/stream.py | 388 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/git/odb/utils.py | 215 ---------------------------- 3 files changed, 460 insertions(+), 257 deletions(-) create mode 100644 lib/git/odb/stream.py (limited to 'lib/git/odb') diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index 1d1d4c40..7ae8f446 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -6,9 +6,12 @@ from git.errors import ( BadObjectType ) -from utils import ( +from stream import ( DecompressMemMapReader, - FDCompressedSha1Writer, + FDCompressedSha1Writer + ) + +from utils import ( ENOENT, to_hex_sha, exists, @@ -31,7 +34,7 @@ import mmap import os -class iObjectDBR(object): +class ObjectDBR(object): """Defines an interface for object database lookup. Objects are identified either by hex-sha (40 bytes) or by sha (20 bytes)""" @@ -48,62 +51,87 @@ class iObjectDBR(object): :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") - def object(self, sha): - """ - :return: tuple(type_string, size_in_bytes, stream) a tuple with object - information including its type, its size as well as a stream from which its - contents can be read + def info(self, sha): + """ :return: ODB_Info instance :param sha: 40 bytes hexsha or 20 bytes binary sha :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") - def object_info(self, sha): - """ - :return: tuple(type_string, size_in_bytes) tuple with the object's type - string as well as its size in bytes + def info_async(self, input_channel): + """Retrieve information of a multitude of objects asynchronously + :param input_channel: Channel yielding the sha's of the objects of interest + :return: Channel yielding ODB_Info|InvalidODB_Info, in any order""" + raise NotImplementedError("To be implemented in subclass") + + def stream(self, sha): + """:return: ODB_OStream instance :param sha: 40 bytes hexsha or 20 bytes binary sha :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") + + def stream_async(self, input_channel): + """Retrieve the ODB_OStream of multiple objects + :param input_channel: see ``info`` + :param max_threads: see ``ObjectDBW.store`` + :return: Channel yielding ODB_OStream|InvalidODB_OStream instances in any order""" + raise NotImplementedError("To be implemented in subclass") #} END query interface -class iObjectDBW(object): +class ObjectDBW(object): """Defines an interface to create objects in the database""" - __slots__ = tuple() + __slots__ = "_ostream" + + def __init__(self, *args, **kwargs): + self._ostream = None #{ Edit Interface + def set_ostream(self, stream): + """Adjusts the stream to which all data should be sent when storing new objects + :param stream: if not None, the stream to use, if None the default stream + will be used. + :return: previously installed stream, or None if there was no override + :raise TypeError: if the stream doesn't have the supported functionality""" + cstream = self._ostream + self._ostream = stream + return cstream + + def ostream(self): + """:return: overridden output stream this instance will write to, or None + if it will write to the default stream""" + return self._ostream - def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True): + def store(self, istream): """Create a new object in the database - :return: the sha identifying the object in the database - :param type: type string identifying the object - :param size: size of the data to read from stream - :param stream: stream providing the data - :param dry_run: if True, the object database will not actually be changed - :param sha_as_hex: if True, the returned sha identifying the object will be - hex encoded, not binary + :return: the input istream object with its sha set to its corresponding value + :param istream: ODB_IStream compatible instance. If its sha is already set + to a value, the object will just be stored in the our database format, + in which case the input stream is expected to be in object format ( header + contents ). :raise IOError: if data could not be written""" raise NotImplementedError("To be implemented in subclass") - def to_objects(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0): - """Create multiple new objects in the database - :return: sequence of shas identifying the created objects in the order in which - they where given. - :param iter_info: iterable yielding tuples containing the type_string - size_in_bytes and the steam with the content data. - :param dry_run: see ``to_object`` - :param sha_as_hex: see ``to_object`` - :param max_threads: if < 1, any number of threads may be started while processing - the request, otherwise the given number of threads will be started. - :raise IOError: if data could not be written""" + def store_async(self, input_channel): + """Create multiple new objects in the database asynchronously. The method will + return right away, returning an output channel which receives the results as + they are computed. + + :return: Channel yielding your ODB_IStream which served as input, in any order. + The IStreams sha will be set to the sha it received during the process, + or its error attribute will be set to the exception informing about the error. + :param input_channel: Channel yielding ODB_IStream instance. + As the same instances will be used in the output channel, you can create a map + between the id(istream) -> istream + :note:As some ODB implementations implement this operation as atomic, they might + abort the whole operation if one item could not be processed. Hence check how + many items have actually been produced.""" # a trivial implementation, ignoring the threads for now # TODO: add configuration to the class to determine whether we may # actually use multiple threads, default False of course. If the add shas = list() for args in iter_info: - shas.append(self.to_object(dry_run=dry_run, sha_as_hex=sha_as_hex, *args)) + shas.append(self.store(dry_run=dry_run, sha_as_hex=sha_as_hex, *args)) return shas - + #} END edit interface @@ -118,6 +146,7 @@ class FileDBBase(object): :raise InvalidDBRoot: :note: The base will perform basic checking for accessability, but the subclass is required to verify that the root_path contains the database structure it needs""" + super(FileDBBase, self).__init__() if not os.path.isdir(root_path): raise InvalidDBRoot(root_path) self._root_path = root_path @@ -141,7 +170,7 @@ class FileDBBase(object): #} END utilities -class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): +class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW): """A database which operates on loose object files""" __slots__ = ('_hexsha_to_file', '_fd_open_flags') # CONFIGURATION @@ -210,7 +239,7 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): os.close(fd) # END assure file is closed - def object_info(self, sha): + def info(self, sha): m = self._map_loose_object(sha) try: return loose_object_header_info(m) @@ -233,8 +262,9 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): return False # END check existance - def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True): + def store(self, istream): # open a tmp file to write the data to + # todo: implement ostream properly fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path) writer = FDCompressedSha1Writer(fd) @@ -269,11 +299,11 @@ class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW): return sha -class PackedDB(FileDBBase, iObjectDBR): +class PackedDB(FileDBBase, ObjectDBR): """A database operating on a set of object packs""" -class CompoundDB(iObjectDBR): +class CompoundDB(ObjectDBR): """A database which delegates calls to sub-databases""" @@ -281,7 +311,7 @@ class ReferenceDB(CompoundDB): """A database consisting of database referred to in a file""" -#class GitObjectDB(CompoundDB, iObjectDBW): +#class GitObjectDB(CompoundDB, ObjectDBW): class GitObjectDB(LooseObjectDB): """A database representing the default git object store, which includes loose objects, pack files and an alternates file @@ -296,7 +326,7 @@ class GitObjectDB(LooseObjectDB): super(GitObjectDB, self).__init__(root_path) self._git = git - def object_info(self, sha): + def info(self, sha): discard, type, size = self._git.get_object_header(sha) return type, size diff --git a/lib/git/odb/stream.py b/lib/git/odb/stream.py new file mode 100644 index 00000000..325c1444 --- /dev/null +++ b/lib/git/odb/stream.py @@ -0,0 +1,388 @@ +import zlib +from cStringIO import StringIO +from git.utils import make_sha +import errno + +from utils import ( + to_hex_sha, + to_bin_sha + ) + +__all__ = ('FDCompressedSha1Writer', 'DecompressMemMapReader') + + +# ZLIB configuration +# used when compressing objects - 1 to 9 ( slowest ) +Z_BEST_SPEED = 1 + + +#{ ODB Bases + +class ODB_Info(tuple): + """Carries information about an object in an ODB, provdiing information + about the sha of the object, the type_string as well as the uncompressed size + in bytes. + + It can be accessed using tuple notation and using attribute access notation:: + + assert dbi[0] == dbi.sha + assert dbi[1] == dbi.type + assert dbi[2] == dbi.size + + The type is designed to be as lighteight as possible.""" + __slots__ = tuple() + + def __new__(cls, sha, type, size): + return tuple.__new__(cls, (sha, type, size)) + + def __init__(self, sha, type, size): + pass + + #{ Interface + @property + def sha(self): + return self[0] + + @property + def type(self): + return self[1] + + @property + def size(self): + return self[2] + #} END interface + + +class ODB_OStream(ODB_Info): + """Base for object streams retrieved from the database, providing additional + information about the stream. + Generally, ODB streams are read-only as objects are immutable""" + __slots__ = tuple() + + def __new__(cls, sha, type, size, *args, **kwargs): + """Helps with the initialization of subclasses""" + return tuple.__new__(cls, (sha, type, size)) + + def is_compressed(self): + """:return: True if reads of this stream yield zlib compressed data. + :note: this does not imply anything about the actual internal storage. + Hence the data could be uncompressed, but read compressed, or vice versa""" + raise NotImplementedError("To be implemented by subclass") + + +class ODB_IStream(list): + """Represents an input content stream to be fed into the ODB. It is mutable to allow + the ODB to record information about the operations outcome right in this instance. + + It provides interfaces for the ODB_OStream and a StreamReader to allow the instance + to blend in without prior conversion. + + The only method your content stream must support is 'read'""" + __slots__ = tuple() + + def __new__(cls, type, size, stream, sha=None, compressed=False): + list.__new__(cls, (sha, type, size, stream, compressed, None)) + + def __init__(cls, type, size, stream, sha=None, compressed=None): + pass + + #{ Interface + + def hexsha(self): + """:return: our sha, hex encoded, 40 bytes""" + return to_hex_sha(self[0]) + + def binsha(self): + """:return: our sha as binary, 20 bytes""" + return to_bin_sha(self[0]) + + def _error(self): + """:return: the error that occurred when processing the stream, or None""" + return self[5] + + def _set_error(self, exc): + """Set this input stream to the given exc, may be None to reset the error""" + self[5] = exc + + error = property(_error, _set_error) + + #} END interface + + #{ Stream Reader Interface + + def read(self, size=-1): + """Implements a simple stream reader interface, passing the read call on + to our internal stream""" + return self[3].read(size) + + #} END stream reader interface + + #{ interface + + def _set_sha(self, sha): + self[0] = sha + + def _sha(self): + return self[0] + + sha = property(_sha, _set_sha) + + @property + def type(self): + return self[1] + + @property + def size(self): + return self[2] + + #} END odb info interface + + #{ ODB_OStream interface + + def is_compressed(self): + return self[4] + + #} END ODB_OStream interface + + +class InvalidODB_Info(tuple): + """Carries information about a sha identifying an object which is invalid in + the queried database. The exception attribute provides more information about + the cause of the issue""" + __slots__ = tuple() + + def __new__(cls, sha, exc): + return tuple.__new__(cls, (sha, exc)) + + def __init__(self, sha, exc): + pass + + @property + def sha(self): + return self[0] + + @property + def error(self): + """:return: exception instance explaining the failure""" + return self[1] + +class InvalidODB_OStream(InvalidODB_Info): + """Carries information about an invalid ODB stream""" + __slots__ = tuple() + +#} END ODB Bases + + +#{ RO Streams + +class DecompressMemMapReader(ODB_OStream): + """Reads data in chunks from a memory map and decompresses it. The client sees + only the uncompressed data, respective file-like read calls are handling on-demand + buffered decompression accordingly + + A constraint on the total size of bytes is activated, simulating + a logical file within a possibly larger physical memory area + + To read efficiently, you clearly don't want to read individual bytes, instead, + read a few kilobytes at least. + + :note: The chunk-size should be carefully selected as it will involve quite a bit + of string copying due to the way the zlib is implemented. Its very wasteful, + hence we try to find a good tradeoff between allocation time and number of + times we actually allocate. An own zlib implementation would be good here + to better support streamed reading - it would only need to keep the mmap + and decompress it into chunks, thats all ... """ + # __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') + + max_read_size = 512*1024 + + def __init__(self, sha, type, size, m, close_on_deletion): + """Initialize with mmap for stream reading""" + self._m = m + self._zip = zlib.decompressobj() + self._buf = None # buffer of decompressed bytes + self._buflen = 0 # length of bytes in buffer + self._s = 0 # size of uncompressed data to read in total + self._br = 0 # num uncompressed bytes read + self._cws = 0 # start byte of compression window + self._cwe = 0 # end byte of compression window + self._close = close_on_deletion # close the memmap on deletion ? + + def __del__(self): + if self._close: + self._m.close() + # END handle resource freeing + + def initialize(self, size=0): + """Initialize this instance for acting as a read-only stream for size bytes. + :param size: size in bytes to be decompresed before being depleted. + If 0, default object header information is parsed from the data, + returning a tuple of (type_string, uncompressed_size) + If not 0, the size will be used, and None is returned. + :note: must only be called exactly once""" + if size: + self._s = size + return + # END handle size + + # read header + maxb = 512 # should really be enough, cgit uses 8192 I believe + self._s = maxb + hdr = self.read(maxb) + hdrend = hdr.find("\0") + type, size = hdr[:hdrend].split(" ") + self._s = int(size) + + # adjust internal state to match actual header length that we ignore + # The buffer will be depleted first on future reads + self._br = 0 + hdrend += 1 # count terminating \0 + self._buf = StringIO(hdr[hdrend:]) + self._buflen = len(hdr) - hdrend + + return type, size + + def read(self, size=-1): + if size < 1: + size = self._s - self._br + else: + size = min(size, self._s - self._br) + # END clamp size + + if size == 0: + return str() + # END handle depletion + + # protect from memory peaks + # If he tries to read large chunks, our memory patterns get really bad + # as we end up copying a possibly huge chunk from our memory map right into + # memory. This might not even be possible. Nonetheless, try to dampen the + # effect a bit by reading in chunks, returning a huge string in the end. + # Our performance now depends on StringIO. This way we don't need two large + # buffers in peak times, but only one large one in the end which is + # the return buffer + # NO: We don't do it - if the user thinks its best, he is right. If he + # has trouble, he will start reading in chunks. According to our tests + # its still faster if we read 10 Mb at once instead of chunking it. + + # if size > self.max_read_size: + # sio = StringIO() + # while size: + # read_size = min(self.max_read_size, size) + # data = self.read(read_size) + # sio.write(data) + # size -= len(data) + # if len(data) < read_size: + # break + # # END data loop + # sio.seek(0) + # return sio.getvalue() + # # END handle maxread + # + # deplete the buffer, then just continue using the decompress object + # which has an own buffer. We just need this to transparently parse the + # header from the zlib stream + dat = str() + if self._buf: + if self._buflen >= size: + # have enough data + dat = self._buf.read(size) + self._buflen -= size + self._br += size + return dat + else: + dat = self._buf.read() # ouch, duplicates data + size -= self._buflen + self._br += self._buflen + + self._buflen = 0 + self._buf = None + # END handle buffer len + # END handle buffer + + # decompress some data + # Abstract: zlib needs to operate on chunks of our memory map ( which may + # be large ), as it will otherwise and always fill in the 'unconsumed_tail' + # attribute which possible reads our whole map to the end, forcing + # everything to be read from disk even though just a portion was requested. + # As this would be a nogo, we workaround it by passing only chunks of data, + # moving the window into the memory map along as we decompress, which keeps + # the tail smaller than our chunk-size. This causes 'only' the chunk to be + # copied once, and another copy of a part of it when it creates the unconsumed + # tail. We have to use it to hand in the appropriate amount of bytes durin g + # the next read. + tail = self._zip.unconsumed_tail + if tail: + # move the window, make it as large as size demands. For code-clarity, + # we just take the chunk from our map again instead of reusing the unconsumed + # tail. The latter one would safe some memory copying, but we could end up + # with not getting enough data uncompressed, so we had to sort that out as well. + # Now we just assume the worst case, hence the data is uncompressed and the window + # needs to be as large as the uncompressed bytes we want to read. + self._cws = self._cwe - len(tail) + self._cwe = self._cws + size + + + indata = self._m[self._cws:self._cwe] # another copy ... :( + # get the actual window end to be sure we don't use it for computations + self._cwe = self._cws + len(indata) + else: + cws = self._cws + self._cws = self._cwe + self._cwe = cws + size + indata = self._m[self._cws:self._cwe] # ... copy it again :( + # END handle tail + + dcompdat = self._zip.decompress(indata, size) + + self._br += len(dcompdat) + if dat: + dcompdat = dat + dcompdat + + return dcompdat + +#} END RO streams + + +#{ W Streams + +class FDCompressedSha1Writer(object): + """Digests data written to it, making the sha available, then compress the + data and write it to the file descriptor + :note: operates on raw file descriptors + :note: for this to work, you have to use the close-method of this instance""" + __slots__ = ("fd", "sha1", "zip") + + # default exception + exc = IOError("Failed to write all bytes to filedescriptor") + + def __init__(self, fd): + self.fd = fd + self.sha1 = make_sha("") + self.zip = zlib.compressobj(Z_BEST_SPEED) + + def write(self, data): + """:raise IOError: If not all bytes could be written + :return: lenght of incoming data""" + self.sha1.update(data) + cdata = self.zip.compress(data) + bytes_written = write(self.fd, cdata) + if bytes_written != len(cdata): + raise self.exc + return len(data) + + def sha(self, as_hex = False): + """:return: sha so far + :param as_hex: if True, sha will be hex-encoded, binary otherwise""" + if as_hex: + return self.sha1.hexdigest() + return self.sha1.digest() + + def close(self): + remainder = self.zip.flush() + if write(self.fd, remainder) != len(remainder): + raise self.exc + return close(self.fd) + + +#} END W streams diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index fd340962..61565ba9 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -1,10 +1,6 @@ import binascii import os -import zlib -from cStringIO import StringIO -from git.utils import make_sha import errno -from fun import chunk_size __all__ = ('FDSha1Writer', ) @@ -38,218 +34,7 @@ read = os.read write = os.write close = os.close -# ZLIB configuration -# used when compressing objects - 1 to 9 ( slowest ) -Z_BEST_SPEED = 1 #} END Routines -#{ Classes - -class FDCompressedSha1Writer(object): - """Digests data written to it, making the sha available, then compress the - data and write it to the file descriptor - :note: operates on raw file descriptors - :note: for this to work, you have to use the close-method of this instance""" - __slots__ = ("fd", "sha1", "zip") - - # default exception - exc = IOError("Failed to write all bytes to filedescriptor") - - def __init__(self, fd): - self.fd = fd - self.sha1 = make_sha("") - self.zip = zlib.compressobj(Z_BEST_SPEED) - - def write(self, data): - """:raise IOError: If not all bytes could be written - :return: lenght of incoming data""" - self.sha1.update(data) - cdata = self.zip.compress(data) - bytes_written = write(self.fd, cdata) - if bytes_written != len(cdata): - raise self.exc - return len(data) - - def sha(self, as_hex = False): - """:return: sha so far - :param as_hex: if True, sha will be hex-encoded, binary otherwise""" - if as_hex: - return self.sha1.hexdigest() - return self.sha1.digest() - - def close(self): - remainder = self.zip.flush() - if write(self.fd, remainder) != len(remainder): - raise self.exc - return close(self.fd) - - -class DecompressMemMapReader(object): - """Reads data in chunks from a memory map and decompresses it. The client sees - only the uncompressed data, respective file-like read calls are handling on-demand - buffered decompression accordingly - - A constraint on the total size of bytes is activated, simulating - a logical file within a possibly larger physical memory area - - To read efficiently, you clearly don't want to read individual bytes, instead, - read a few kilobytes at least. - - :note: The chunk-size should be carefully selected as it will involve quite a bit - of string copying due to the way the zlib is implemented. Its very wasteful, - hence we try to find a good tradeoff between allocation time and number of - times we actually allocate. An own zlib implementation would be good here - to better support streamed reading - it would only need to keep the mmap - and decompress it into chunks, thats all ... """ - __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') - - max_read_size = 512*1024 - - def __init__(self, m, close_on_deletion): - """Initialize with mmap for stream reading""" - self._m = m - self._zip = zlib.decompressobj() - self._buf = None # buffer of decompressed bytes - self._buflen = 0 # length of bytes in buffer - self._s = 0 # size of uncompressed data to read in total - self._br = 0 # num uncompressed bytes read - self._cws = 0 # start byte of compression window - self._cwe = 0 # end byte of compression window - self._close = close_on_deletion # close the memmap on deletion ? - - def __del__(self): - if self._close: - self._m.close() - # END handle resource freeing - - def initialize(self, size=0): - """Initialize this instance for acting as a read-only stream for size bytes. - :param size: size in bytes to be decompresed before being depleted. - If 0, default object header information is parsed from the data, - returning a tuple of (type_string, uncompressed_size) - If not 0, the size will be used, and None is returned. - :note: must only be called exactly once""" - if size: - self._s = size - return - # END handle size - - # read header - maxb = 512 # should really be enough, cgit uses 8192 I believe - self._s = maxb - hdr = self.read(maxb) - hdrend = hdr.find("\0") - type, size = hdr[:hdrend].split(" ") - self._s = int(size) - - # adjust internal state to match actual header length that we ignore - # The buffer will be depleted first on future reads - self._br = 0 - hdrend += 1 # count terminating \0 - self._buf = StringIO(hdr[hdrend:]) - self._buflen = len(hdr) - hdrend - - return type, size - - def read(self, size=-1): - if size < 1: - size = self._s - self._br - else: - size = min(size, self._s - self._br) - # END clamp size - - if size == 0: - return str() - # END handle depletion - - # protect from memory peaks - # If he tries to read large chunks, our memory patterns get really bad - # as we end up copying a possibly huge chunk from our memory map right into - # memory. This might not even be possible. Nonetheless, try to dampen the - # effect a bit by reading in chunks, returning a huge string in the end. - # Our performance now depends on StringIO. This way we don't need two large - # buffers in peak times, but only one large one in the end which is - # the return buffer - # NO: We don't do it - if the user thinks its best, he is right. If he - # has trouble, he will start reading in chunks. According to our tests - # its still faster if we read 10 Mb at once instead of chunking it. - - # if size > self.max_read_size: - # sio = StringIO() - # while size: - # read_size = min(self.max_read_size, size) - # data = self.read(read_size) - # sio.write(data) - # size -= len(data) - # if len(data) < read_size: - # break - # # END data loop - # sio.seek(0) - # return sio.getvalue() - # # END handle maxread - # - # deplete the buffer, then just continue using the decompress object - # which has an own buffer. We just need this to transparently parse the - # header from the zlib stream - dat = str() - if self._buf: - if self._buflen >= size: - # have enough data - dat = self._buf.read(size) - self._buflen -= size - self._br += size - return dat - else: - dat = self._buf.read() # ouch, duplicates data - size -= self._buflen - self._br += self._buflen - - self._buflen = 0 - self._buf = None - # END handle buffer len - # END handle buffer - - # decompress some data - # Abstract: zlib needs to operate on chunks of our memory map ( which may - # be large ), as it will otherwise and always fill in the 'unconsumed_tail' - # attribute which possible reads our whole map to the end, forcing - # everything to be read from disk even though just a portion was requested. - # As this would be a nogo, we workaround it by passing only chunks of data, - # moving the window into the memory map along as we decompress, which keeps - # the tail smaller than our chunk-size. This causes 'only' the chunk to be - # copied once, and another copy of a part of it when it creates the unconsumed - # tail. We have to use it to hand in the appropriate amount of bytes durin g - # the next read. - tail = self._zip.unconsumed_tail - if tail: - # move the window, make it as large as size demands. For code-clarity, - # we just take the chunk from our map again instead of reusing the unconsumed - # tail. The latter one would safe some memory copying, but we could end up - # with not getting enough data uncompressed, so we had to sort that out as well. - # Now we just assume the worst case, hence the data is uncompressed and the window - # needs to be as large as the uncompressed bytes we want to read. - self._cws = self._cwe - len(tail) - self._cwe = self._cws + size - - - indata = self._m[self._cws:self._cwe] # another copy ... :( - # get the actual window end to be sure we don't use it for computations - self._cwe = self._cws + len(indata) - else: - cws = self._cws - self._cws = self._cwe - self._cwe = cws + size - indata = self._m[self._cws:self._cwe] # ... copy it again :( - # END handle tail - - dcompdat = self._zip.decompress(indata, size) - - self._br += len(dcompdat) - if dat: - dcompdat = dat + dcompdat - - return dcompdat - -#} END classes -- cgit v1.2.1 From e746f96bcc29238b79118123028ca170adc4ff0f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 4 Jun 2010 17:22:08 +0200 Subject: Fixed implementation after design change to deal with it - all tests run, but next there will have to be more through testing --- lib/git/odb/__init__.py | 4 ++ lib/git/odb/db.py | 116 ++++++++++++++++----------------- lib/git/odb/fun.py | 46 ++++++------- lib/git/odb/stream.py | 168 ++++++++++++++++++++++++++++++++---------------- lib/git/odb/utils.py | 2 - 5 files changed, 195 insertions(+), 141 deletions(-) (limited to 'lib/git/odb') diff --git a/lib/git/odb/__init__.py b/lib/git/odb/__init__.py index 17000244..5789d7eb 100644 --- a/lib/git/odb/__init__.py +++ b/lib/git/odb/__init__.py @@ -1,2 +1,6 @@ """Initialize the object database module""" +# default imports +from db import * +from stream import * + diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index 7ae8f446..a8de28ec 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -8,7 +8,10 @@ from git.errors import ( from stream import ( DecompressMemMapReader, - FDCompressedSha1Writer + FDCompressedSha1Writer, + Sha1Writer, + OStream, + OInfo ) from utils import ( @@ -34,11 +37,13 @@ import mmap import os +__all__ = ('ObjectDBR', 'ObjectDBW', 'FileDBBase', 'LooseObjectDB', 'PackedDB', + 'CompoundDB', 'ReferenceDB', 'GitObjectDB' ) + class ObjectDBR(object): """Defines an interface for object database lookup. Objects are identified either by hex-sha (40 bytes) or by sha (20 bytes)""" - __slots__ = tuple() def __contains__(self, sha): return self.has_obj @@ -52,7 +57,7 @@ class ObjectDBR(object): raise NotImplementedError("To be implemented in subclass") def info(self, sha): - """ :return: ODB_Info instance + """ :return: OInfo instance :param sha: 40 bytes hexsha or 20 bytes binary sha :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") @@ -60,27 +65,26 @@ class ObjectDBR(object): def info_async(self, input_channel): """Retrieve information of a multitude of objects asynchronously :param input_channel: Channel yielding the sha's of the objects of interest - :return: Channel yielding ODB_Info|InvalidODB_Info, in any order""" + :return: Channel yielding OInfo|InvalidOInfo, in any order""" raise NotImplementedError("To be implemented in subclass") def stream(self, sha): - """:return: ODB_OStream instance + """:return: OStream instance :param sha: 40 bytes hexsha or 20 bytes binary sha :raise BadObject:""" raise NotImplementedError("To be implemented in subclass") def stream_async(self, input_channel): - """Retrieve the ODB_OStream of multiple objects + """Retrieve the OStream of multiple objects :param input_channel: see ``info`` :param max_threads: see ``ObjectDBW.store`` - :return: Channel yielding ODB_OStream|InvalidODB_OStream instances in any order""" + :return: Channel yielding OStream|InvalidOStream instances in any order""" raise NotImplementedError("To be implemented in subclass") #} END query interface class ObjectDBW(object): """Defines an interface to create objects in the database""" - __slots__ = "_ostream" def __init__(self, *args, **kwargs): self._ostream = None @@ -99,12 +103,12 @@ class ObjectDBW(object): def ostream(self): """:return: overridden output stream this instance will write to, or None if it will write to the default stream""" - return self._ostream + return self._ostream def store(self, istream): """Create a new object in the database :return: the input istream object with its sha set to its corresponding value - :param istream: ODB_IStream compatible instance. If its sha is already set + :param istream: IStream compatible instance. If its sha is already set to a value, the object will just be stored in the our database format, in which case the input stream is expected to be in object format ( header + contents ). :raise IOError: if data could not be written""" @@ -115,22 +119,16 @@ class ObjectDBW(object): return right away, returning an output channel which receives the results as they are computed. - :return: Channel yielding your ODB_IStream which served as input, in any order. + :return: Channel yielding your IStream which served as input, in any order. The IStreams sha will be set to the sha it received during the process, or its error attribute will be set to the exception informing about the error. - :param input_channel: Channel yielding ODB_IStream instance. + :param input_channel: Channel yielding IStream instance. As the same instances will be used in the output channel, you can create a map between the id(istream) -> istream :note:As some ODB implementations implement this operation as atomic, they might abort the whole operation if one item could not be processed. Hence check how many items have actually been produced.""" - # a trivial implementation, ignoring the threads for now - # TODO: add configuration to the class to determine whether we may - # actually use multiple threads, default False of course. If the add - shas = list() - for args in iter_info: - shas.append(self.store(dry_run=dry_run, sha_as_hex=sha_as_hex, *args)) - return shas + raise NotImplementedError("To be implemented in subclass") #} END edit interface @@ -138,7 +136,6 @@ class ObjectDBW(object): class FileDBBase(object): """Provides basic facilities to retrieve files of interest, including caching facilities to help mapping hexsha's to objects""" - __slots__ = ('_root_path', ) def __init__(self, root_path): """Initialize this instance to look for its files at the given root path @@ -164,15 +161,11 @@ class FileDBBase(object): return join(self._root_path, rela_path) #} END interface - #{ Utiltities - - - #} END utilities class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW): """A database which operates on loose object files""" - __slots__ = ('_hexsha_to_file', '_fd_open_flags') + # CONFIGURATION # chunks in which data will be copied between streams stream_chunk_size = chunk_size @@ -238,21 +231,26 @@ class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW): finally: os.close(fd) # END assure file is closed + + def set_ostream(self, stream): + """:raise TypeError: if the stream does not support the Sha1Writer interface""" + if stream is not None and not isinstance(stream, Sha1Writer): + raise TypeError("Output stream musst support the %s interface" % Sha1Writer.__name__) + return super(LooseObjectDB, self).set_ostream(stream) def info(self, sha): m = self._map_loose_object(sha) try: - return loose_object_header_info(m) + type, size = loose_object_header_info(m) + return OInfo(sha, type, size) finally: m.close() # END assure release of system resources - def object(self, sha): + def stream(self, sha): m = self._map_loose_object(sha) - reader = DecompressMemMapReader(m, close_on_deletion = True) - type, size = reader.initialize() - - return type, size, reader + type, size, stream = DecompressMemMapReader.new(m, close_on_deletion = True) + return OStream(sha, type, size, stream) def has_object(self, sha): try: @@ -263,27 +261,33 @@ class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW): # END check existance def store(self, istream): - # open a tmp file to write the data to - # todo: implement ostream properly - fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path) - writer = FDCompressedSha1Writer(fd) + """note: The sha we produce will be hex by nature""" + assert istream.sha is None, "Direct istream writing not yet implemented" + tmp_path = None + writer = self.ostream() + if writer is None: + # open a tmp file to write the data to + fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path) + writer = FDCompressedSha1Writer(fd) + # END handle custom writer try: - write_object(type, size, stream, writer, - close_target_stream=True, chunk_size=self.stream_chunk_size) - except: - os.remove(tmp_path) - raise - # END assure tmpfile removal on error - + try: + write_object(istream.type, istream.size, istream.read, writer.write, + chunk_size=self.stream_chunk_size) + except: + if tmp_path: + os.remove(tmp_path) + raise + # END assure tmpfile removal on error + finally: + if tmp_path: + writer.close() + # END assure target stream is closed - # in dry-run mode, we delete the file afterwards sha = writer.sha(as_hex=True) - if dry_run: - os.remove(tmp_path) - else: - # rename the file into place + if tmp_path: obj_path = self.db_path(self.object_path(sha)) obj_dir = dirname(obj_path) if not isdir(obj_dir): @@ -292,11 +296,8 @@ class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW): rename(tmp_path, obj_path) # END handle dry_run - if not sha_as_hex: - sha = hex_to_bin(sha) - # END handle sha format - - return sha + istream.sha = sha + return istream class PackedDB(FileDBBase, ObjectDBR): @@ -320,18 +321,17 @@ class GitObjectDB(LooseObjectDB): :note: for now, we use the git command to do all the lookup, just until he have packs and the other implementations """ - __slots__ = ('_git', ) def __init__(self, root_path, git): """Initialize this instance with the root and a git command""" super(GitObjectDB, self).__init__(root_path) self._git = git def info(self, sha): - discard, type, size = self._git.get_object_header(sha) - return type, size + t = self._git.get_object_header(sha) + return OInfo(t[0], t[1], t[2]) - def object(self, sha): + def stream(self, sha): """For now, all lookup is done by git itself""" - discard, type, size, stream = self._git.stream_object_data(sha) - return type, size, stream + t = self._git.stream_object_data(sha) + return OStream(t[0], t[1], t[2], t[3]) diff --git a/lib/git/odb/fun.py b/lib/git/odb/fun.py index ee7144dd..870a6f02 100644 --- a/lib/git/odb/fun.py +++ b/lib/git/odb/fun.py @@ -21,6 +21,8 @@ type_id_to_type_map = { # used when dealing with larger streams chunk_size = 1000*1000 +__all__ = ('is_loose_object', 'loose_object_header_info', 'object_header_info', + 'write_object' ) #{ Routines @@ -73,42 +75,34 @@ def object_header_info(m): raise BadObjectType(type_id) # END handle exceptions -def write_object(type, size, source_stream, target_stream, close_target_stream=True, - chunk_size=chunk_size): +def write_object(type, size, read, write, chunk_size=chunk_size): """Write the object as identified by type, size and source_stream into the target_stream :param type: type string of the object :param size: amount of bytes to write from source_stream - :param source_stream: stream as file-like object providing at least size bytes - :param target_stream: stream as file-like object to receive the data + :param read: read method of a stream providing the content data + :param write: write method of the output stream :param close_target_stream: if True, the target stream will be closed when the routine exits, even if an error is thrown - :param chunk_size: size of chunks to read from source. Larger values can be beneficial - for io performance, but cost more memory as well :return: The actual amount of bytes written to stream, which includes the header and a trailing newline""" tbw = 0 # total num bytes written dbw = 0 # num data bytes written - try: - # WRITE HEADER: type SP size NULL - tbw += target_stream.write("%s %i\0" % (type, size)) - - # WRITE ALL DATA UP TO SIZE - while True: - cs = min(chunk_size, size-dbw) - data_len = target_stream.write(source_stream.read(cs)) - dbw += data_len - if data_len < cs or dbw == size: - tbw += dbw - break - # END check for stream end - # END duplicate data - return tbw - finally: - if close_target_stream: - target_stream.close() - # END handle stream closing - # END assure file was closed + # WRITE HEADER: type SP size NULL + tbw += write("%s %i\0" % (type, size)) + + # WRITE ALL DATA UP TO SIZE + while True: + cs = min(chunk_size, size-dbw) + data_len = write(read(cs)) + dbw += data_len + if data_len < cs or dbw == size: + tbw += dbw + break + # END check for stream end + # END duplicate data + return tbw + #} END routines diff --git a/lib/git/odb/stream.py b/lib/git/odb/stream.py index 325c1444..d1181382 100644 --- a/lib/git/odb/stream.py +++ b/lib/git/odb/stream.py @@ -5,10 +5,13 @@ import errno from utils import ( to_hex_sha, - to_bin_sha + to_bin_sha, + write, + close ) -__all__ = ('FDCompressedSha1Writer', 'DecompressMemMapReader') +__all__ = ('OInfo', 'OStream', 'IStream', 'InvalidOInfo', 'InvalidOStream', + 'DecompressMemMapReader', 'FDCompressedSha1Writer') # ZLIB configuration @@ -18,7 +21,7 @@ Z_BEST_SPEED = 1 #{ ODB Bases -class ODB_Info(tuple): +class OInfo(tuple): """Carries information about an object in an ODB, provdiing information about the sha of the object, the type_string as well as the uncompressed size in bytes. @@ -35,8 +38,8 @@ class ODB_Info(tuple): def __new__(cls, sha, type, size): return tuple.__new__(cls, (sha, type, size)) - def __init__(self, sha, type, size): - pass + def __init__(self, *args): + tuple.__init__(self) #{ Interface @property @@ -53,38 +56,52 @@ class ODB_Info(tuple): #} END interface -class ODB_OStream(ODB_Info): +class OStream(OInfo): """Base for object streams retrieved from the database, providing additional information about the stream. Generally, ODB streams are read-only as objects are immutable""" __slots__ = tuple() - def __new__(cls, sha, type, size, *args, **kwargs): + def __new__(cls, sha, type, size, stream, *args, **kwargs): """Helps with the initialization of subclasses""" - return tuple.__new__(cls, (sha, type, size)) + return tuple.__new__(cls, (sha, type, size, stream)) + + + def __init__(self, *args, **kwargs): + tuple.__init__(self) + #{ Interface def is_compressed(self): - """:return: True if reads of this stream yield zlib compressed data. + """:return: True if reads of this stream yield zlib compressed data. Default False :note: this does not imply anything about the actual internal storage. Hence the data could be uncompressed, but read compressed, or vice versa""" - raise NotImplementedError("To be implemented by subclass") + raise False + + #} END interface + + #{ Stream Reader Interface + + def read(self, size=-1): + return self[3].read(size) + + #} END stream reader interface -class ODB_IStream(list): +class IStream(list): """Represents an input content stream to be fed into the ODB. It is mutable to allow the ODB to record information about the operations outcome right in this instance. - It provides interfaces for the ODB_OStream and a StreamReader to allow the instance + It provides interfaces for the OStream and a StreamReader to allow the instance to blend in without prior conversion. The only method your content stream must support is 'read'""" __slots__ = tuple() def __new__(cls, type, size, stream, sha=None, compressed=False): - list.__new__(cls, (sha, type, size, stream, compressed, None)) + return list.__new__(cls, (sha, type, size, stream, compressed, None)) - def __init__(cls, type, size, stream, sha=None, compressed=None): - pass + def __init__(self, type, size, stream, sha=None, compressed=None): + list.__init__(self, (sha, type, size, stream, compressed, None)) #{ Interface @@ -127,25 +144,42 @@ class ODB_IStream(list): sha = property(_sha, _set_sha) - @property - def type(self): + + def _type(self): return self[1] + + def _set_type(self, type): + self[1] = type - @property - def size(self): + type = property(_type, _set_type) + + def _size(self): return self[2] + + def _set_size(self, size): + self[2] = size + + size = property(_size, _set_size) + + def _stream(self): + return self[3] + + def _set_stream(self, stream): + self[3] = stream + + stream = property(_stream, _set_stream) #} END odb info interface - #{ ODB_OStream interface + #{ OStream interface def is_compressed(self): return self[4] - #} END ODB_OStream interface + #} END OStream interface -class InvalidODB_Info(tuple): +class InvalidOInfo(tuple): """Carries information about a sha identifying an object which is invalid in the queried database. The exception attribute provides more information about the cause of the issue""" @@ -155,7 +189,7 @@ class InvalidODB_Info(tuple): return tuple.__new__(cls, (sha, exc)) def __init__(self, sha, exc): - pass + tuple.__init__(self, (sha, exc)) @property def sha(self): @@ -166,7 +200,8 @@ class InvalidODB_Info(tuple): """:return: exception instance explaining the failure""" return self[1] -class InvalidODB_OStream(InvalidODB_Info): + +class InvalidOStream(InvalidOInfo): """Carries information about an invalid ODB stream""" __slots__ = tuple() @@ -175,7 +210,7 @@ class InvalidODB_OStream(InvalidODB_Info): #{ RO Streams -class DecompressMemMapReader(ODB_OStream): +class DecompressMemMapReader(object): """Reads data in chunks from a memory map and decompresses it. The client sees only the uncompressed data, respective file-like read calls are handling on-demand buffered decompression accordingly @@ -192,17 +227,17 @@ class DecompressMemMapReader(ODB_OStream): times we actually allocate. An own zlib implementation would be good here to better support streamed reading - it would only need to keep the mmap and decompress it into chunks, thats all ... """ - # __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') + __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close') max_read_size = 512*1024 - def __init__(self, sha, type, size, m, close_on_deletion): + def __init__(self, m, close_on_deletion, size): """Initialize with mmap for stream reading""" self._m = m self._zip = zlib.decompressobj() self._buf = None # buffer of decompressed bytes self._buflen = 0 # length of bytes in buffer - self._s = 0 # size of uncompressed data to read in total + self._s = size # size of uncompressed data to read in total self._br = 0 # num uncompressed bytes read self._cws = 0 # start byte of compression window self._cwe = 0 # end byte of compression window @@ -213,34 +248,33 @@ class DecompressMemMapReader(ODB_OStream): self._m.close() # END handle resource freeing - def initialize(self, size=0): - """Initialize this instance for acting as a read-only stream for size bytes. - :param size: size in bytes to be decompresed before being depleted. - If 0, default object header information is parsed from the data, - returning a tuple of (type_string, uncompressed_size) - If not 0, the size will be used, and None is returned. - :note: must only be called exactly once""" - if size: - self._s = size - return - # END handle size + @classmethod + def new(self, m, close_on_deletion=False): + """Create a new DecompressMemMapReader instance for acting as a read-only stream + This method parses the object header from m and returns the parsed + type and size, as well as the created stream instance. + :param m: memory map on which to oparate + :param close_on_deletion: if True, the memory map will be closed once we are + being deleted""" + inst = DecompressMemMapReader(m, close_on_deletion, 0) # read header maxb = 512 # should really be enough, cgit uses 8192 I believe - self._s = maxb - hdr = self.read(maxb) + inst._s = maxb + hdr = inst.read(maxb) hdrend = hdr.find("\0") type, size = hdr[:hdrend].split(" ") - self._s = int(size) + size = int(size) + inst._s = size # adjust internal state to match actual header length that we ignore # The buffer will be depleted first on future reads - self._br = 0 + inst._br = 0 hdrend += 1 # count terminating \0 - self._buf = StringIO(hdr[hdrend:]) - self._buflen = len(hdr) - hdrend + inst._buf = StringIO(hdr[hdrend:]) + inst._buflen = len(hdr) - hdrend - return type, size + return type, size, inst def read(self, size=-1): if size < 1: @@ -346,7 +380,35 @@ class DecompressMemMapReader(ODB_OStream): #{ W Streams -class FDCompressedSha1Writer(object): +class Sha1Writer(object): + """Simple stream writer which produces a sha whenever you like as it degests + everything it is supposed to write""" + + def __init__(self): + self.sha1 = make_sha("") + + #{ Stream Interface + + def write(self, data): + """:raise IOError: If not all bytes could be written + :return: lenght of incoming data""" + self.sha1.update(data) + return len(data) + + # END stream interface + + #{ Interface + + def sha(self, as_hex = False): + """:return: sha so far + :param as_hex: if True, sha will be hex-encoded, binary otherwise""" + if as_hex: + return self.sha1.hexdigest() + return self.sha1.digest() + + #} END interface + +class FDCompressedSha1Writer(Sha1Writer): """Digests data written to it, making the sha available, then compress the data and write it to the file descriptor :note: operates on raw file descriptors @@ -357,10 +419,12 @@ class FDCompressedSha1Writer(object): exc = IOError("Failed to write all bytes to filedescriptor") def __init__(self, fd): + super(FDCompressedSha1Writer, self).__init__() self.fd = fd - self.sha1 = make_sha("") self.zip = zlib.compressobj(Z_BEST_SPEED) + #{ Stream Interface + def write(self, data): """:raise IOError: If not all bytes could be written :return: lenght of incoming data""" @@ -371,18 +435,12 @@ class FDCompressedSha1Writer(object): raise self.exc return len(data) - def sha(self, as_hex = False): - """:return: sha so far - :param as_hex: if True, sha will be hex-encoded, binary otherwise""" - if as_hex: - return self.sha1.hexdigest() - return self.sha1.digest() - def close(self): remainder = self.zip.flush() if write(self.fd, remainder) != len(remainder): raise self.exc return close(self.fd) + #} END stream interface #} END W streams diff --git a/lib/git/odb/utils.py b/lib/git/odb/utils.py index 61565ba9..6863e97b 100644 --- a/lib/git/odb/utils.py +++ b/lib/git/odb/utils.py @@ -2,8 +2,6 @@ import binascii import os import errno -__all__ = ('FDSha1Writer', ) - #{ Routines hex_to_bin = binascii.a2b_hex -- cgit v1.2.1