summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2011-05-05 19:44:36 +0200
committerSebastian Thiel <byronimo@gmail.com>2011-05-05 19:44:36 +0200
commite03093dd392b92f51b7d7cf66d7b1949b9f843e6 (patch)
treedbd2b85d8b201cb5f2848b3aaa11cffbc96031c2
parent6463c10db377573e695bc504a9451bdb6cbf61f5 (diff)
downloadgitdbmerger.tar.gz
Removed plenty of code which went into git-python. This is just for completeness, gitdb doesn't need to be worked on anymoregitdbmerger
-rw-r--r--gitdb/config.py411
-rw-r--r--gitdb/exc.py26
-rw-r--r--gitdb/object/base.py172
-rw-r--r--gitdb/object/commit.py253
-rw-r--r--gitdb/object/fun.py197
-rw-r--r--gitdb/object/submodule.py8
-rw-r--r--gitdb/object/tag.py72
-rw-r--r--gitdb/object/tree.py281
-rw-r--r--gitdb/object/util.py308
-rw-r--r--gitdb/ref/head.py15
-rw-r--r--gitdb/ref/headref.py83
-rw-r--r--gitdb/ref/log.py278
-rw-r--r--gitdb/ref/reference.py81
-rw-r--r--gitdb/ref/remote.py46
-rw-r--r--gitdb/ref/symbolic.py652
-rw-r--r--gitdb/ref/tag.py49
-rw-r--r--gitdb/test/__init__.py9
-rw-r--r--gitdb/test/db/test_base.py17
-rw-r--r--gitdb/test/lib.py199
-rw-r--r--gitdb/test/object/test_blob.py21
-rw-r--r--gitdb/test/object/test_commit.py274
-rw-r--r--gitdb/test/object/test_tree.py143
-rw-r--r--gitdb/test/performance/lib.py48
-rw-r--r--gitdb/test/performance/test_pack.py89
-rw-r--r--gitdb/test/performance/test_stream.py189
-rw-r--r--gitdb/test/test_base.py91
-rw-r--r--gitdb/test/test_config.py101
-rw-r--r--gitdb/test/test_example.py63
-rw-r--r--gitdb/test/test_pack.py246
-rw-r--r--gitdb/test/test_refs.py39
-rw-r--r--gitdb/test/test_util.py125
-rw-r--r--gitdb/util.py741
32 files changed, 22 insertions, 5305 deletions
diff --git a/gitdb/config.py b/gitdb/config.py
index a8efe46..9347d9c 100644
--- a/gitdb/config.py
+++ b/gitdb/config.py
@@ -7,415 +7,4 @@
"""Module containing module parser implementation able to properly read and write
configuration files"""
-import re
-import os
-import ConfigParser as cp
-import inspect
-import cStringIO
-from gitdb.odict import OrderedDict
-from gitdb.util import LockFile
-
-__all__ = ('GitConfigParser', 'SectionConstraint')
-
-class MetaParserBuilder(type):
- """Utlity class wrapping base-class methods into decorators that assure read-only properties"""
- def __new__(metacls, name, bases, clsdict):
- """
- Equip all base-class methods with a needs_values decorator, and all non-const methods
- with a set_dirty_and_flush_changes decorator in addition to that."""
- kmm = '_mutating_methods_'
- if kmm in clsdict:
- mutating_methods = clsdict[kmm]
- for base in bases:
- methods = ( t for t in inspect.getmembers(base, inspect.ismethod) if not t[0].startswith("_") )
- for name, method in methods:
- if name in clsdict:
- continue
- method_with_values = needs_values(method)
- if name in mutating_methods:
- method_with_values = set_dirty_and_flush_changes(method_with_values)
- # END mutating methods handling
-
- clsdict[name] = method_with_values
- # END for each name/method pair
- # END for each base
- # END if mutating methods configuration is set
-
- new_type = super(MetaParserBuilder, metacls).__new__(metacls, name, bases, clsdict)
- return new_type
-
-
-
-def needs_values(func):
- """Returns method assuring we read values (on demand) before we try to access them"""
- def assure_data_present(self, *args, **kwargs):
- self.read()
- return func(self, *args, **kwargs)
- # END wrapper method
- assure_data_present.__name__ = func.__name__
- return assure_data_present
-
-def set_dirty_and_flush_changes(non_const_func):
- """Return method that checks whether given non constant function may be called.
- If so, the instance will be set dirty.
- Additionally, we flush the changes right to disk"""
- def flush_changes(self, *args, **kwargs):
- rval = non_const_func(self, *args, **kwargs)
- self.write()
- return rval
- # END wrapper method
- flush_changes.__name__ = non_const_func.__name__
- return flush_changes
-
-
-class SectionConstraint(object):
- """Constrains a ConfigParser to only option commands which are constrained to
- always use the section we have been initialized with.
-
- It supports all ConfigParser methods that operate on an option"""
- __slots__ = ("_config", "_section_name")
- _valid_attrs_ = ("get_value", "set_value", "get", "set", "getint", "getfloat", "getboolean", "has_option",
- "remove_section", "remove_option", "options")
-
- def __init__(self, config, section):
- self._config = config
- self._section_name = section
-
- def __getattr__(self, attr):
- if attr in self._valid_attrs_:
- return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs)
- return super(SectionConstraint,self).__getattribute__(attr)
-
- def _call_config(self, method, *args, **kwargs):
- """Call the configuration at the given method which must take a section name
- as first argument"""
- return getattr(self._config, method)(self._section_name, *args, **kwargs)
-
- @property
- def config(self):
- """return: Configparser instance we constrain"""
- return self._config
-
-
-class GitConfigParser(cp.RawConfigParser, object):
- """Implements specifics required to read git style configuration files.
-
- This variation behaves much like the git.config command such that the configuration
- will be read on demand based on the filepath given during initialization.
-
- The changes will automatically be written once the instance goes out of scope, but
- can be triggered manually as well.
-
- The configuration file will be locked if you intend to change values preventing other
- instances to write concurrently.
-
- :note:
- The config is case-sensitive even when queried, hence section and option names
- must match perfectly."""
- __metaclass__ = MetaParserBuilder
-
-
- #{ Configuration
- # The lock type determines the type of lock to use in new configuration readers.
- # They must be compatible to the LockFile interface.
- # A suitable alternative would be the BlockingLockFile
- t_lock = LockFile
-
- #} END configuration
-
- OPTCRE = re.compile(
- r'\s?(?P<option>[^:=\s][^:=]*)' # very permissive, incuding leading whitespace
- r'\s*(?P<vi>[:=])\s*' # any number of space/tab,
- # followed by separator
- # (either : or =), followed
- # by any # space/tab
- r'(?P<value>.*)$' # everything up to eol
- )
-
- # list of RawConfigParser methods able to change the instance
- _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set")
- __slots__ = ("_sections", "_defaults", "_file_or_files", "_read_only","_is_initialized", '_lock')
-
- def __init__(self, file_or_files, read_only=True):
- """Initialize a configuration reader to read the given file_or_files and to
- possibly allow changes to it by setting read_only False
-
- :param file_or_files:
- A single file path or file objects or multiple of these
-
- :param read_only:
- If True, the ConfigParser may only read the data , but not change it.
- If False, only a single file path or file object may be given."""
- super(GitConfigParser, self).__init__()
- # initialize base with ordered dictionaries to be sure we write the same
- # file back
- self._sections = OrderedDict()
- self._defaults = OrderedDict()
-
- self._file_or_files = file_or_files
- self._read_only = read_only
- self._is_initialized = False
- self._lock = None
-
- if not read_only:
- if isinstance(file_or_files, (tuple, list)):
- raise ValueError("Write-ConfigParsers can operate on a single file only, multiple files have been passed")
- # END single file check
-
- if not isinstance(file_or_files, basestring):
- file_or_files = file_or_files.name
- # END get filename from handle/stream
- # initialize lock base - we want to write
- self._lock = self.t_lock(file_or_files)
-
- self._lock._obtain_lock()
- # END read-only check
-
-
- def __del__(self):
- """Write pending changes if required and release locks"""
- # checking for the lock here makes sure we do not raise during write()
- # in case an invalid parser was created who could not get a lock
- if self.read_only or not self._lock._has_lock():
- return
-
- try:
- try:
- self.write()
- except IOError,e:
- print "Exception during destruction of GitConfigParser: %s" % str(e)
- finally:
- self._lock._release_lock()
-
- def optionxform(self, optionstr):
- """Do not transform options in any way when writing"""
- return optionstr
-
- def _read(self, fp, fpname):
- """A direct copy of the py2.4 version of the super class's _read method
- to assure it uses ordered dicts. Had to change one line to make it work.
-
- Future versions have this fixed, but in fact its quite embarassing for the
- guys not to have done it right in the first place !
-
- Removed big comments to make it more compact.
-
- Made sure it ignores initial whitespace as git uses tabs"""
- cursect = None # None, or a dictionary
- optname = None
- lineno = 0
- e = None # None, or an exception
- while True:
- line = fp.readline()
- if not line:
- break
- lineno = lineno + 1
- # comment or blank line?
- if line.strip() == '' or line[0] in '#;':
- continue
- if line.split(None, 1)[0].lower() == 'rem' and line[0] in "rR":
- # no leading whitespace
- continue
- else:
- # is it a section header?
- mo = self.SECTCRE.match(line)
- if mo:
- sectname = mo.group('header')
- if sectname in self._sections:
- cursect = self._sections[sectname]
- elif sectname == cp.DEFAULTSECT:
- cursect = self._defaults
- else:
- # THE ONLY LINE WE CHANGED !
- cursect = OrderedDict((('__name__', sectname),))
- self._sections[sectname] = cursect
- # So sections can't start with a continuation line
- optname = None
- # no section header in the file?
- elif cursect is None:
- raise cp.MissingSectionHeaderError(fpname, lineno, line)
- # an option line?
- else:
- mo = self.OPTCRE.match(line)
- if mo:
- optname, vi, optval = mo.group('option', 'vi', 'value')
- if vi in ('=', ':') and ';' in optval:
- pos = optval.find(';')
- if pos != -1 and optval[pos-1].isspace():
- optval = optval[:pos]
- optval = optval.strip()
- if optval == '""':
- optval = ''
- optname = self.optionxform(optname.rstrip())
- cursect[optname] = optval
- else:
- if not e:
- e = cp.ParsingError(fpname)
- e.append(lineno, repr(line))
- # END
- # END ?
- # END ?
- # END while reading
- # if any parsing errors occurred, raise an exception
- if e:
- raise e
-
-
- def read(self):
- """Reads the data stored in the files we have been initialized with. It will
- ignore files that cannot be read, possibly leaving an empty configuration
-
- :return: Nothing
- :raise IOError: if a file cannot be handled"""
- if self._is_initialized:
- return
-
- files_to_read = self._file_or_files
- if not isinstance(files_to_read, (tuple, list)):
- files_to_read = [ files_to_read ]
-
- for file_object in files_to_read:
- fp = file_object
- close_fp = False
- # assume a path if it is not a file-object
- if not hasattr(file_object, "seek"):
- try:
- fp = open(file_object)
- close_fp = True
- except IOError,e:
- continue
- # END fp handling
-
- try:
- self._read(fp, fp.name)
- finally:
- if close_fp:
- fp.close()
- # END read-handling
- # END for each file object to read
- self._is_initialized = True
-
- def _write(self, fp):
- """Write an .ini-format representation of the configuration state in
- git compatible format"""
- def write_section(name, section_dict):
- fp.write("[%s]\n" % name)
- for (key, value) in section_dict.items():
- if key != "__name__":
- fp.write("\t%s = %s\n" % (key, str(value).replace('\n', '\n\t')))
- # END if key is not __name__
- # END section writing
-
- if self._defaults:
- write_section(cp.DEFAULTSECT, self._defaults)
- map(lambda t: write_section(t[0],t[1]), self._sections.items())
-
-
- @needs_values
- def write(self):
- """Write changes to our file, if there are changes at all
-
- :raise IOError: if this is a read-only writer instance or if we could not obtain
- a file lock"""
- self._assure_writable("write")
-
- fp = self._file_or_files
- close_fp = False
-
- # we have a physical file on disk, so get a lock
- if isinstance(fp, (basestring, file)):
- self._lock._obtain_lock()
- # END get lock for physical files
-
- if not hasattr(fp, "seek"):
- fp = open(self._file_or_files, "w")
- close_fp = True
- else:
- fp.seek(0)
- # END handle stream or file
-
- # WRITE DATA
- try:
- self._write(fp)
- finally:
- if close_fp:
- fp.close()
- # END data writing
-
- # we do not release the lock - it will be done automatically once the
- # instance vanishes
-
- def _assure_writable(self, method_name):
- if self.read_only:
- raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name))
-
- @needs_values
- @set_dirty_and_flush_changes
- def add_section(self, section):
- """Assures added options will stay in order"""
- super(GitConfigParser, self).add_section(section)
- self._sections[section] = OrderedDict()
-
- @property
- def read_only(self):
- """:return: True if this instance may change the configuration file"""
- return self._read_only
-
- def get_value(self, section, option, default = None):
- """
- :param default:
- If not None, the given default value will be returned in case
- the option did not exist
- :return: a properly typed value, either int, float or string
-
- :raise TypeError: in case the value could not be understood
- Otherwise the exceptions known to the ConfigParser will be raised."""
- try:
- valuestr = self.get(section, option)
- except Exception:
- if default is not None:
- return default
- raise
-
- types = ( long, float )
- for numtype in types:
- try:
- val = numtype( valuestr )
-
- # truncated value ?
- if val != float( valuestr ):
- continue
-
- return val
- except (ValueError,TypeError):
- continue
- # END for each numeric type
-
- # try boolean values as git uses them
- vl = valuestr.lower()
- if vl == 'false':
- return False
- if vl == 'true':
- return True
-
- if not isinstance( valuestr, basestring ):
- raise TypeError( "Invalid value type: only int, long, float and str are allowed", valuestr )
-
- return valuestr
-
- @needs_values
- @set_dirty_and_flush_changes
- def set_value(self, section, option, value):
- """Sets the given option in section to the given value.
- It will create the section if required, and will not throw as opposed to the default
- ConfigParser 'set' method.
-
- :param section: Name of the section in which the option resides or should reside
- :param option: Name of the options whose value to set
-
- :param value: Value to set the option to. It must be a string or convertible
- to a string"""
- if not self.has_section(section):
- self.add_section(section)
- self.set(section, option, str(value))
diff --git a/gitdb/exc.py b/gitdb/exc.py
index e087047..89a686a 100644
--- a/gitdb/exc.py
+++ b/gitdb/exc.py
@@ -3,30 +3,4 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module with common exceptions"""
-from util import to_hex_sha
-class ODBError(Exception):
- """All errors thrown by the object database"""
-
-class InvalidDBRoot(ODBError):
- """Thrown if an object database cannot be initialized at the given path"""
-
-class BadObject(ODBError):
- """The object with the given SHA does not exist. Instantiate with the
- failed sha"""
-
- def __str__(self):
- return "BadObject: %s" % to_hex_sha(self.args[0])
-
-class ParseError(ODBError):
- """Thrown if the parsing of a file failed due to an invalid format"""
-
-class AmbiguousObjectName(ODBError):
- """Thrown if a possibly shortened name does not uniquely represent a single object
- in the database"""
-
-class BadObjectType(ODBError):
- """The object had an unsupported type"""
-
-class UnsupportedOperation(ODBError):
- """Thrown if the given operation cannot be supported by the object database"""
diff --git a/gitdb/object/base.py b/gitdb/object/base.py
index ee76da1..e8158c9 100644
--- a/gitdb/object/base.py
+++ b/gitdb/object/base.py
@@ -3,176 +3,4 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from util import get_object_type_by_name
-from gitdb.util import (
- hex_to_bin,
- bin_to_hex,
- dirname,
- basename,
- LazyMixin,
- join_path_native,
- stream_copy
- )
-from gitdb.typ import ObjectType
-
-_assertion_msg_format = "Created object %r whose python type %r disagrees with the acutal git object type %r"
-
-__all__ = ("Object", "IndexObject")
-
-class Object(LazyMixin):
- """Implements an Object which may be Blobs, Trees, Commits and Tags"""
- NULL_HEX_SHA = '0'*40
- NULL_BIN_SHA = '\0'*20
-
- TYPES = (ObjectType.blob, ObjectType.tree, ObjectType.commit, ObjectType.tag)
- __slots__ = ("odb", "binsha", "size" )
-
- type = None # to be set by subclass
- type_id = None # to be set by subclass
-
- def __init__(self, odb, binsha):
- """Initialize an object by identifying it by its binary sha.
- All keyword arguments will be set on demand if None.
-
- :param odb: repository this object is located in
-
- :param binsha: 20 byte SHA1"""
- super(Object,self).__init__()
- self.odb = odb
- self.binsha = binsha
- assert len(binsha) == 20, "Require 20 byte binary sha, got %r, len = %i" % (binsha, len(binsha))
-
- @classmethod
- def new(cls, odb, id):
- """
- :return: New Object instance of a type appropriate to the object type behind
- id. The id of the newly created object will be a binsha even though
- the input id may have been a Reference or Rev-Spec
-
- :param id: reference, rev-spec, or hexsha
-
- :note: This cannot be a __new__ method as it would always call __init__
- with the input id which is not necessarily a binsha."""
- return odb.rev_parse(str(id))
-
- @classmethod
- def new_from_sha(cls, odb, sha1):
- """
- :return: new object instance of a type appropriate to represent the given
- binary sha1
- :param sha1: 20 byte binary sha1"""
- if sha1 == cls.NULL_BIN_SHA:
- # the NULL binsha is always the root commit
- return get_object_type_by_name('commit')(odb, sha1)
- #END handle special case
- oinfo = odb.info(sha1)
- inst = get_object_type_by_name(oinfo.type)(odb, oinfo.binsha)
- inst.size = oinfo.size
- return inst
-
- def _set_cache_(self, attr):
- """Retrieve object information"""
- if attr == "size":
- oinfo = self.odb.info(self.binsha)
- self.size = oinfo.size
- # assert oinfo.type == self.type, _assertion_msg_format % (self.binsha, oinfo.type, self.type)
- else:
- super(Object,self)._set_cache_(attr)
-
- def __eq__(self, other):
- """:return: True if the objects have the same SHA1"""
- return self.binsha == other.binsha
-
- def __ne__(self, other):
- """:return: True if the objects do not have the same SHA1 """
- return self.binsha != other.binsha
-
- def __hash__(self):
- """:return: Hash of our id allowing objects to be used in dicts and sets"""
- return hash(self.binsha)
-
- def __str__(self):
- """:return: string of our SHA1 as understood by all git commands"""
- return bin_to_hex(self.binsha)
-
- def __repr__(self):
- """:return: string with pythonic representation of our object"""
- return '<git.%s "%s">' % (self.__class__.__name__, self.hexsha)
-
- @property
- def hexsha(self):
- """:return: 40 byte hex version of our 20 byte binary sha"""
- return bin_to_hex(self.binsha)
-
- @property
- def data_stream(self):
- """ :return: File Object compatible stream to the uncompressed raw data of the object
- :note: returned streams must be read in order"""
- return self.odb.stream(self.binsha)
-
- def stream_data(self, ostream):
- """Writes our data directly to the given output stream
- :param ostream: File object compatible stream object.
- :return: self"""
- istream = self.odb.stream(self.binsha)
- stream_copy(istream, ostream)
- return self
-
-
-class IndexObject(Object):
- """Base for all objects that can be part of the index file , namely Tree, Blob and
- SubModule objects"""
- __slots__ = ("path", "mode")
-
- # for compatability with iterable lists
- _id_attribute_ = 'path'
-
- def __init__(self, odb, binsha, mode=None, path=None):
- """Initialize a newly instanced IndexObject
- :param odb: is the object database we are located in
- :param binsha: 20 byte sha1
- :param mode: is the stat compatible file mode as int, use the stat module
- to evaluate the infomration
- :param path:
- is the path to the file in the file system, relative to the git repository root, i.e.
- file.ext or folder/other.ext
- :note:
- Path may not be set of the index object has been created directly as it cannot
- be retrieved without knowing the parent tree."""
- super(IndexObject, self).__init__(odb, binsha)
- if mode is not None:
- self.mode = mode
- if path is not None:
- self.path = path
-
- def __hash__(self):
- """:return:
- Hash of our path as index items are uniquely identifyable by path, not
- by their data !"""
- return hash(self.path)
-
- def _set_cache_(self, attr):
- if attr in IndexObject.__slots__:
- # they cannot be retrieved lateron ( not without searching for them )
- raise AttributeError( "path and mode attributes must have been set during %s object creation" % type(self).__name__ )
- else:
- super(IndexObject, self)._set_cache_(attr)
- # END hanlde slot attribute
-
- @property
- def name(self):
- """:return: Name portion of the path, effectively being the basename"""
- return basename(self.path)
-
- @property
- def abspath(self):
- """
- :return:
- Absolute path to this index object in the file system ( as opposed to the
- .path field which is a path relative to the git repository ).
-
- The returned path will be native to the system and contains '\' on windows. """
- assert False, "Only works if repository is not bare - provide this check in an interface"
- return join_path_native(dirname(self.odb.root_path()), self.path)
-
diff --git a/gitdb/object/commit.py b/gitdb/object/commit.py
index 7f3d9e4..cb0c639 100644
--- a/gitdb/object/commit.py
+++ b/gitdb/object/commit.py
@@ -3,257 +3,4 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from gitdb.typ import ObjectType
-from tree import Tree
-from cStringIO import StringIO
-import base
-from gitdb.util import (
- hex_to_bin,
- Actor,
- )
-from util import (
- Traversable,
- Serializable,
- altz_to_utctz_str,
- parse_actor_and_date
- )
-import sys
-
-__all__ = ('Commit', )
-
-class Commit(base.Object, Traversable, Serializable):
- """Wraps a git Commit object.
-
- This class will act lazily on some of its attributes and will query the
- value on demand only if it involves calling the git binary."""
-
- # ENVIRONMENT VARIABLES
- # read when creating new commits
- env_author_date = "GIT_AUTHOR_DATE"
- env_committer_date = "GIT_COMMITTER_DATE"
-
- # CONFIGURATION KEYS
- conf_encoding = 'i18n.commitencoding'
-
- # INVARIANTS
- default_encoding = "UTF-8"
-
-
- # object configuration
- type = ObjectType.commit
- type_id = ObjectType.commit_id
-
- __slots__ = ("tree",
- "author", "authored_date", "author_tz_offset",
- "committer", "committed_date", "committer_tz_offset",
- "message", "parents", "encoding")
- _id_attribute_ = "binsha"
-
- def __init__(self, odb, binsha, tree=None, author=None, authored_date=None, author_tz_offset=None,
- committer=None, committed_date=None, committer_tz_offset=None,
- message=None, parents=None, encoding=None):
- """Instantiate a new Commit. All keyword arguments taking None as default will
- be implicitly set on first query.
-
- :param binsha: 20 byte sha1
- :param parents: tuple( Commit, ... )
- is a tuple of commit ids or actual Commits
- :param tree: Tree
- Tree object
- :param author: Actor
- is the author string ( will be implicitly converted into an Actor object )
- :param authored_date: int_seconds_since_epoch
- is the authored DateTime - use time.gmtime() to convert it into a
- different format
- :param author_tz_offset: int_seconds_west_of_utc
- is the timezone that the authored_date is in
- :param committer: Actor
- is the committer string
- :param committed_date: int_seconds_since_epoch
- is the committed DateTime - use time.gmtime() to convert it into a
- different format
- :param committer_tz_offset: int_seconds_west_of_utc
- is the timezone that the authored_date is in
- :param message: string
- is the commit message
- :param encoding: string
- encoding of the message, defaults to UTF-8
- :param parents:
- List or tuple of Commit objects which are our parent(s) in the commit
- dependency graph
- :return: git.Commit
-
- :note: Timezone information is in the same format and in the same sign
- as what time.altzone returns. The sign is inverted compared to git's
- UTC timezone."""
- super(Commit,self).__init__(odb, binsha)
- if tree is not None:
- assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
- if tree is not None:
- self.tree = tree
- if author is not None:
- self.author = author
- if authored_date is not None:
- self.authored_date = authored_date
- if author_tz_offset is not None:
- self.author_tz_offset = author_tz_offset
- if committer is not None:
- self.committer = committer
- if committed_date is not None:
- self.committed_date = committed_date
- if committer_tz_offset is not None:
- self.committer_tz_offset = committer_tz_offset
- if message is not None:
- self.message = message
- if parents is not None:
- self.parents = parents
- if encoding is not None:
- self.encoding = encoding
-
- @classmethod
- def _get_intermediate_items(cls, commit):
- return commit.parents
-
- def _set_cache_(self, attr):
- if attr in Commit.__slots__:
- # read the data in a chunk, its faster - then provide a file wrapper
- binsha, typename, self.size, stream = self.odb.stream(self.binsha)
- self._deserialize(StringIO(stream.read()))
- else:
- super(Commit, self)._set_cache_(attr)
- # END handle attrs
-
- @property
- def summary(self):
- """:return: First line of the commit message"""
- return self.message.split('\n', 1)[0]
-
- @classmethod
- def _iter_from_process_or_stream(cls, odb, proc_or_stream):
- """Parse out commit information into a list of Commit objects
- We expect one-line per commit, and parse the actual commit information directly
- from our lighting fast object database
-
- :param proc: git-rev-list process instance - one sha per line
- :return: iterator returning Commit objects"""
- stream = proc_or_stream
- if not hasattr(stream,'readline'):
- stream = proc_or_stream.stdout
-
- readline = stream.readline
- while True:
- line = readline()
- if not line:
- break
- hexsha = line.strip()
- if len(hexsha) > 40:
- # split additional information, as returned by bisect for instance
- hexsha, rest = line.split(None, 1)
- # END handle extra info
-
- assert len(hexsha) == 40, "Invalid line: %s" % hexsha
- yield cls(odb, hex_to_bin(hexsha))
- # END for each line in stream
-
- #{ Serializable Implementation
-
- def _serialize(self, stream):
- write = stream.write
- write("tree %s\n" % self.tree)
- for p in self.parents:
- write("parent %s\n" % p)
-
- a = self.author
- aname = a.name
- if isinstance(aname, unicode):
- aname = aname.encode(self.encoding)
- # END handle unicode in name
-
- c = self.committer
- fmt = "%s %s <%s> %s %s\n"
- write(fmt % ("author", aname, a.email,
- self.authored_date,
- altz_to_utctz_str(self.author_tz_offset)))
-
- # encode committer
- aname = c.name
- if isinstance(aname, unicode):
- aname = aname.encode(self.encoding)
- # END handle unicode in name
- write(fmt % ("committer", aname, c.email,
- self.committed_date,
- altz_to_utctz_str(self.committer_tz_offset)))
-
- if self.encoding != self.default_encoding:
- write("encoding %s\n" % self.encoding)
-
- write("\n")
-
- # write plain bytes, be sure its encoded according to our encoding
- if isinstance(self.message, unicode):
- write(self.message.encode(self.encoding))
- else:
- write(self.message)
- # END handle encoding
- return self
-
- def _deserialize(self, stream):
- """:param from_rev_list: if true, the stream format is coming from the rev-list command
- Otherwise it is assumed to be a plain data stream from our object"""
- readline = stream.readline
- self.tree = Tree(self.odb, hex_to_bin(readline().split()[1]), Tree.tree_id<<12, '')
-
- self.parents = list()
- next_line = None
- while True:
- parent_line = readline()
- if not parent_line.startswith('parent'):
- next_line = parent_line
- break
- # END abort reading parents
- self.parents.append(type(self)(self.odb, hex_to_bin(parent_line.split()[-1])))
- # END for each parent line
- self.parents = tuple(self.parents)
-
- self.author, self.authored_date, self.author_tz_offset = parse_actor_and_date(next_line)
- self.committer, self.committed_date, self.committer_tz_offset = parse_actor_and_date(readline())
-
-
- # now we can have the encoding line, or an empty line followed by the optional
- # message.
- self.encoding = self.default_encoding
- # read encoding or empty line to separate message
- enc = readline()
- enc = enc.strip()
- if enc:
- self.encoding = enc[enc.find(' ')+1:]
- # now comes the message separator
- readline()
- # END handle encoding
-
- # decode the authors name
- try:
- self.author.name = self.author.name.decode(self.encoding)
- except UnicodeDecodeError:
- print >> sys.stderr, "Failed to decode author name '%s' using encoding %s" % (self.author.name, self.encoding)
- # END handle author's encoding
-
- # decode committer name
- try:
- self.committer.name = self.committer.name.decode(self.encoding)
- except UnicodeDecodeError:
- print >> sys.stderr, "Failed to decode committer name '%s' using encoding %s" % (self.committer.name, self.encoding)
- # END handle author's encoding
-
- # a stream from our data simply gives us the plain message
- # The end of our message stream is marked with a newline that we strip
- self.message = stream.read()
- try:
- self.message = self.message.decode(self.encoding)
- except UnicodeDecodeError:
- print >> sys.stderr, "Failed to decode message '%s' using encoding %s" % (self.message, self.encoding)
- # END exception handling
- return self
-
- #} END serializable implementation
diff --git a/gitdb/object/fun.py b/gitdb/object/fun.py
index 9b0a377..22016b2 100644
--- a/gitdb/object/fun.py
+++ b/gitdb/object/fun.py
@@ -1,199 +1,2 @@
"""Module with functions which are supposed to be as fast as possible"""
-from stat import S_ISDIR
-__all__ = ('tree_to_stream', 'tree_entries_from_data', 'traverse_trees_recursive',
- 'traverse_tree_recursive')
-
-
-
-
-def tree_to_stream(entries, write):
- """Write the give list of entries into a stream using its write method
- :param entries: **sorted** list of tuples with (binsha, mode, name)
- :param write: write method which takes a data string"""
- ord_zero = ord('0')
- bit_mask = 7 # 3 bits set
-
- for binsha, mode, name in entries:
- mode_str = ''
- for i in xrange(6):
- mode_str = chr(((mode >> (i*3)) & bit_mask) + ord_zero) + mode_str
- # END for each 8 octal value
-
- # git slices away the first octal if its zero
- if mode_str[0] == '0':
- mode_str = mode_str[1:]
- # END save a byte
-
- # here it comes: if the name is actually unicode, the replacement below
- # will not work as the binsha is not part of the ascii unicode encoding -
- # hence we must convert to an utf8 string for it to work properly.
- # According to my tests, this is exactly what git does, that is it just
- # takes the input literally, which appears to be utf8 on linux.
- if isinstance(name, unicode):
- name = name.encode("utf8")
- write("%s %s\0%s" % (mode_str, name, binsha))
- # END for each item
-
-
-def tree_entries_from_data(data):
- """Reads the binary representation of a tree and returns tuples of Tree items
- :param data: data block with tree data
- :return: list(tuple(binsha, mode, tree_relative_path), ...)"""
- ord_zero = ord('0')
- len_data = len(data)
- i = 0
- out = list()
- while i < len_data:
- mode = 0
-
- # read mode
- # Some git versions truncate the leading 0, some don't
- # The type will be extracted from the mode later
- while data[i] != ' ':
- # move existing mode integer up one level being 3 bits
- # and add the actual ordinal value of the character
- mode = (mode << 3) + (ord(data[i]) - ord_zero)
- i += 1
- # END while reading mode
-
- # byte is space now, skip it
- i += 1
-
- # parse name, it is NULL separated
-
- ns = i
- while data[i] != '\0':
- i += 1
- # END while not reached NULL
-
- # default encoding for strings in git is utf8
- # Only use the respective unicode object if the byte stream was encoded
- name = data[ns:i]
- name_enc = name.decode("utf-8")
- if len(name) > len(name_enc):
- name = name_enc
- # END handle encoding
-
- # byte is NULL, get next 20
- i += 1
- sha = data[i:i+20]
- i = i + 20
- out.append((sha, mode, name))
- # END for each byte in data stream
- return out
-
-
-def _find_by_name(tree_data, name, is_dir, start_at):
- """return data entry matching the given name and tree mode
- or None.
- Before the item is returned, the respective data item is set
- None in the tree_data list to mark it done"""
- try:
- item = tree_data[start_at]
- if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
- tree_data[start_at] = None
- return item
- except IndexError:
- pass
- # END exception handling
- for index, item in enumerate(tree_data):
- if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
- tree_data[index] = None
- return item
- # END if item matches
- # END for each item
- return None
-
-def _to_full_path(item, path_prefix):
- """Rebuild entry with given path prefix"""
- if not item:
- return item
- return (item[0], item[1], path_prefix+item[2])
-
-def traverse_trees_recursive(odb, tree_shas, path_prefix):
- """
- :return: list with entries according to the given binary tree-shas.
- The result is encoded in a list
- of n tuple|None per blob/commit, (n == len(tree_shas)), where
- * [0] == 20 byte sha
- * [1] == mode as int
- * [2] == path relative to working tree root
- The entry tuple is None if the respective blob/commit did not
- exist in the given tree.
- :param tree_shas: iterable of shas pointing to trees. All trees must
- be on the same level. A tree-sha may be None in which case None
- :param path_prefix: a prefix to be added to the returned paths on this level,
- set it '' for the first iteration
- :note: The ordering of the returned items will be partially lost"""
- trees_data = list()
- nt = len(tree_shas)
- for tree_sha in tree_shas:
- if tree_sha is None:
- data = list()
- else:
- data = tree_entries_from_data(odb.stream(tree_sha).read())
- # END handle muted trees
- trees_data.append(data)
- # END for each sha to get data for
-
- out = list()
- out_append = out.append
-
- # find all matching entries and recursively process them together if the match
- # is a tree. If the match is a non-tree item, put it into the result.
- # Processed items will be set None
- for ti, tree_data in enumerate(trees_data):
- for ii, item in enumerate(tree_data):
- if not item:
- continue
- # END skip already done items
- entries = [ None for n in range(nt) ]
- entries[ti] = item
- sha, mode, name = item # its faster to unpack
- is_dir = S_ISDIR(mode) # type mode bits
-
- # find this item in all other tree data items
- # wrap around, but stop one before our current index, hence
- # ti+nt, not ti+1+nt
- for tio in range(ti+1, ti+nt):
- tio = tio % nt
- entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii)
- # END for each other item data
-
- # if we are a directory, enter recursion
- if is_dir:
- out.extend(traverse_trees_recursive(odb, [((ei and ei[0]) or None) for ei in entries], path_prefix+name+'/'))
- else:
- out_append(tuple(_to_full_path(e, path_prefix) for e in entries))
- # END handle recursion
-
- # finally mark it done
- tree_data[ii] = None
- # END for each item
-
- # we are done with one tree, set all its data empty
- del(tree_data[:])
- # END for each tree_data chunk
- return out
-
-def traverse_tree_recursive(odb, tree_sha, path_prefix):
- """
- :return: list of entries of the tree pointed to by the binary tree_sha. An entry
- has the following format:
- * [0] 20 byte sha
- * [1] mode as int
- * [2] path relative to the repository
- :param path_prefix: prefix to prepend to the front of all returned paths"""
- entries = list()
- data = tree_entries_from_data(odb.stream(tree_sha).read())
-
- # unpacking/packing is faster than accessing individual items
- for sha, mode, name in data:
- if S_ISDIR(mode):
- entries.extend(traverse_tree_recursive(odb, sha, path_prefix+name+'/'))
- else:
- entries.append((sha, mode, path_prefix+name))
- # END for each item
-
- return entries
diff --git a/gitdb/object/submodule.py b/gitdb/object/submodule.py
index 77669b3..33cb649 100644
--- a/gitdb/object/submodule.py
+++ b/gitdb/object/submodule.py
@@ -1,12 +1,6 @@
from base import IndexObject
-class Submodule(IndexObject):
- """Dummy type representing submodules. At some point an implemenation might be add
- ( it currently is located in GitPython )"""
- # this is a bogus type for base class compatability
- type = 'submodule'
- # this type doesn't really have a type id
- type_id = 0
+
diff --git a/gitdb/object/tag.py b/gitdb/object/tag.py
index ce702c7..c09daab 100644
--- a/gitdb/object/tag.py
+++ b/gitdb/object/tag.py
@@ -4,76 +4,4 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
""" Module containing all object based types. """
-import base
-from gitdb.util import hex_to_bin
-from util import (
- get_object_type_by_name,
- parse_actor_and_date
- )
-from gitdb.typ import ObjectType
-
-__all__ = ("TagObject", )
-
-class TagObject(base.Object):
- """Non-Lightweight tag carrying additional information about an object we are pointing to."""
- type = ObjectType.tag
- type_id = ObjectType.tag_id
-
- __slots__ = ( "object", "tag", "tagger", "tagged_date", "tagger_tz_offset", "message" )
-
- def __init__(self, odb, binsha, object=None, tag=None,
- tagger=None, tagged_date=None, tagger_tz_offset=None, message=None):
- """Initialize a tag object with additional data
-
- :param odb: repository this object is located in
- :param binsha: 20 byte SHA1
- :param object: Object instance of object we are pointing to
- :param tag: name of this tag
- :param tagger: Actor identifying the tagger
- :param tagged_date: int_seconds_since_epoch
- is the DateTime of the tag creation - use time.gmtime to convert
- it into a different format
- :param tagged_tz_offset: int_seconds_west_of_utc is the timezone that the
- authored_date is in, in a format similar to time.altzone"""
- super(TagObject, self).__init__(odb, binsha )
- if object is not None:
- self.object = object
- if tag is not None:
- self.tag = tag
- if tagger is not None:
- self.tagger = tagger
- if tagged_date is not None:
- self.tagged_date = tagged_date
- if tagger_tz_offset is not None:
- self.tagger_tz_offset = tagger_tz_offset
- if message is not None:
- self.message = message
-
- def _set_cache_(self, attr):
- """Cache all our attributes at once"""
- if attr in TagObject.__slots__:
- ostream = self.odb.stream(self.binsha)
- lines = ostream.read().splitlines()
-
- obj, hexsha = lines[0].split(" ") # object <hexsha>
- type_token, type_name = lines[1].split(" ") # type <type_name>
- self.object = get_object_type_by_name(type_name)(self.odb, hex_to_bin(hexsha))
-
- self.tag = lines[2][4:] # tag <tag name>
-
- tagger_info = lines[3][7:]# tagger <actor> <date>
- self.tagger, self.tagged_date, self.tagger_tz_offset = parse_actor_and_date(tagger_info)
-
- # line 4 empty - it could mark the beginning of the next header
- # in case there really is no message, it would not exist. Otherwise
- # a newline separates header from message
- if len(lines) > 5:
- self.message = "\n".join(lines[5:])
- else:
- self.message = ''
- # END check our attributes
- else:
- super(TagObject, self)._set_cache_(attr)
-
-
diff --git a/gitdb/object/tree.py b/gitdb/object/tree.py
index 8dabd6e..110fa7d 100644
--- a/gitdb/object/tree.py
+++ b/gitdb/object/tree.py
@@ -3,285 +3,4 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-import util
-from gitdb.typ import ObjectType
-from base import IndexObject
-from blob import Blob
-from submodule import Submodule
-from fun import (
- tree_entries_from_data,
- tree_to_stream
- )
-
-from gitdb.util import (
- to_bin_sha,
- join_path
- )
-
-__all__ = ("TreeModifier", "Tree")
-
-class TreeModifier(object):
- """A utility class providing methods to alter the underlying cache in a list-like fashion.
-
- Once all adjustments are complete, the _cache, which really is a refernce to
- the cache of a tree, will be sorted. Assuring it will be in a serializable state"""
- __slots__ = '_cache'
-
- def __init__(self, cache):
- self._cache = cache
-
- def _index_by_name(self, name):
- """:return: index of an item with name, or -1 if not found"""
- for i, t in enumerate(self._cache):
- if t[2] == name:
- return i
- # END found item
- # END for each item in cache
- return -1
-
- #{ Interface
- def set_done(self):
- """Call this method once you are done modifying the tree information.
- It may be called several times, but be aware that each call will cause
- a sort operation
- :return self:"""
- self._cache.sort(key=lambda t: t[2]) # sort by name
- return self
- #} END interface
-
- #{ Mutators
- def add(self, sha, mode, name, force=False):
- """Add the given item to the tree. If an item with the given name already
- exists, nothing will be done, but a ValueError will be raised if the
- sha and mode of the existing item do not match the one you add, unless
- force is True
-
- :param sha: The 20 or 40 byte sha of the item to add
- :param mode: int representing the stat compatible mode of the item
- :param force: If True, an item with your name and information will overwrite
- any existing item with the same name, no matter which information it has
- :return: self"""
- if '/' in name:
- raise ValueError("Name must not contain '/' characters")
- if (mode >> 12) not in Tree._map_id_to_type:
- raise ValueError("Invalid object type according to mode %o" % mode)
-
- sha = to_bin_sha(sha)
- index = self._index_by_name(name)
- item = (sha, mode, name)
- if index == -1:
- self._cache.append(item)
- else:
- if force:
- self._cache[index] = item
- else:
- ex_item = self._cache[index]
- if ex_item[0] != sha or ex_item[1] != mode:
- raise ValueError("Item %r existed with different properties" % name)
- # END handle mismatch
- # END handle force
- # END handle name exists
- return self
-
- def add_unchecked(self, binsha, mode, name):
- """Add the given item to the tree, its correctness is assumed, which
- puts the caller into responsibility to assure the input is correct.
- For more information on the parameters, see ``add``
- :param binsha: 20 byte binary sha"""
- self._cache.append((binsha, mode, name))
-
- def __delitem__(self, name):
- """Deletes an item with the given name if it exists"""
- index = self._index_by_name(name)
- if index > -1:
- del(self._cache[index])
-
- #} END mutators
-
-
-class Tree(IndexObject, util.Traversable, util.Serializable):
- """Tree objects represent an ordered list of Blobs and other Trees.
-
- ``Tree as a list``::
-
- Access a specific blob using the
- tree['filename'] notation.
-
- You may as well access by index
- blob = tree[0]
- """
-
- type = ObjectType.tree
- type_id = ObjectType.tree_id
-
- __slots__ = "_cache"
-
- # actual integer ids for comparison
- commit_id = 016 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link
- blob_id = 010
- symlink_id = 012
- tree_id = 004
-
- #{ Configuration
-
- # override in subclass if you would like your own types to be instantiated instead
- _map_id_to_type = {
- commit_id : Submodule,
- blob_id : Blob,
- symlink_id : Blob
- # tree id added once Tree is defined
- }
-
- #} end configuration
-
-
- def __init__(self, repo, binsha, mode=tree_id<<12, path=None):
- super(Tree, self).__init__(repo, binsha, mode, path)
-
- @classmethod
- def _get_intermediate_items(cls, index_object):
- if index_object.type == "tree":
- return tuple(index_object._iter_convert_to_object(index_object._cache))
- return tuple()
-
- def _set_cache_(self, attr):
- if attr == "_cache":
- # Set the data when we need it
- ostream = self.odb.stream(self.binsha)
- self._cache = tree_entries_from_data(ostream.read())
- else:
- super(Tree, self)._set_cache_(attr)
- # END handle attribute
-
- def _iter_convert_to_object(self, iterable):
- """Iterable yields tuples of (binsha, mode, name), which will be converted
- to the respective object representation"""
- for binsha, mode, name in iterable:
- path = join_path(self.path, name)
- try:
- yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
- except KeyError:
- raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path))
- # END for each item
-
- def __div__(self, file):
- """Find the named object in this tree's contents
- :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule``
-
- :raise KeyError: if given file or tree does not exist in tree"""
- msg = "Blob or Tree named %r not found"
- if '/' in file:
- tree = self
- item = self
- tokens = file.split('/')
- for i,token in enumerate(tokens):
- item = tree[token]
- if item.type == 'tree':
- tree = item
- else:
- # safety assertion - blobs are at the end of the path
- if i != len(tokens)-1:
- raise KeyError(msg % file)
- return item
- # END handle item type
- # END for each token of split path
- if item == self:
- raise KeyError(msg % file)
- return item
- else:
- for info in self._cache:
- if info[2] == file: # [2] == name
- return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
- # END for each obj
- raise KeyError( msg % file )
- # END handle long paths
-
-
- @property
- def trees(self):
- """:return: list(Tree, ...) list of trees directly below this tree"""
- return [ i for i in self if i.type == "tree" ]
-
- @property
- def blobs(self):
- """:return: list(Blob, ...) list of blobs directly below this tree"""
- return [ i for i in self if i.type == "blob" ]
-
- @property
- def cache(self):
- """
- :return: An object allowing to modify the internal cache. This can be used
- to change the tree's contents. When done, make sure you call ``set_done``
- on the tree modifier, or serialization behaviour will be incorrect.
- See the ``TreeModifier`` for more information on how to alter the cache"""
- return TreeModifier(self._cache)
-
- def traverse( self, predicate = lambda i,d: True,
- prune = lambda i,d: False, depth = -1, branch_first=True,
- visit_once = False, ignore_self=1 ):
- """For documentation, see util.Traversable.traverse
- Trees are set to visit_once = False to gain more performance in the traversal"""
- return super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
-
- # List protocol
- def __getslice__(self, i, j):
- return list(self._iter_convert_to_object(self._cache[i:j]))
-
- def __iter__(self):
- return self._iter_convert_to_object(self._cache)
-
- def __len__(self):
- return len(self._cache)
-
- def __getitem__(self, item):
- if isinstance(item, int):
- info = self._cache[item]
- return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
-
- if isinstance(item, basestring):
- # compatability
- return self.__div__(item)
- # END index is basestring
-
- raise TypeError( "Invalid index type: %r" % item )
-
-
- def __contains__(self, item):
- if isinstance(item, IndexObject):
- for info in self._cache:
- if item.binsha == info[0]:
- return True
- # END compare sha
- # END for each entry
- # END handle item is index object
- # compatability
-
- # treat item as repo-relative path
- path = self.path
- for info in self._cache:
- if item == join_path(path, info[2]):
- return True
- # END for each item
- return False
-
- def __reversed__(self):
- return reversed(self._iter_convert_to_object(self._cache))
-
- def _serialize(self, stream):
- """Serialize this tree into the stream. Please note that we will assume
- our tree data to be in a sorted state. If this is not the case, serialization
- will not generate a correct tree representation as these are assumed to be sorted
- by algorithms"""
- tree_to_stream(self._cache, stream.write)
- return self
-
- def _deserialize(self, stream):
- self._cache = tree_entries_from_data(stream.read())
- return self
-
-
-# END tree
-
-# finalize map definition
-Tree._map_id_to_type[Tree.tree_id] = Tree
diff --git a/gitdb/object/util.py b/gitdb/object/util.py
index e63cb30..5f61495 100644
--- a/gitdb/object/util.py
+++ b/gitdb/object/util.py
@@ -4,312 +4,4 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module for general utility functions"""
-from gitdb.util import (
- IterableList,
- Actor
- )
-import re
-from collections import deque as Deque
-
-from string import digits
-import time
-import os
-
-__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date',
- 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz',
- 'verify_utctz', 'Actor')
-
-#{ Functions
-
-def mode_str_to_int(modestr):
- """
- :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used
- :return:
- String identifying a mode compatible to the mode methods ids of the
- stat module regarding the rwx permissions for user, group and other,
- special flags and file system flags, i.e. whether it is a symlink
- for example."""
- mode = 0
- for iteration, char in enumerate(reversed(modestr[-6:])):
- mode += int(char) << iteration*3
- # END for each char
- return mode
-
-def get_object_type_by_name(object_type_name):
- """
- :return: type suitable to handle the given object type name.
- Use the type to create new instances.
-
- :param object_type_name: Member of TYPES
-
- :raise ValueError: In case object_type_name is unknown"""
- if object_type_name == "commit":
- import commit
- return commit.Commit
- elif object_type_name == "tag":
- import tag
- return tag.TagObject
- elif object_type_name == "blob":
- import blob
- return blob.Blob
- elif object_type_name == "tree":
- import tree
- return tree.Tree
- else:
- raise ValueError("Cannot handle unknown object type: %s" % object_type_name)
-
-def utctz_to_altz(utctz):
- """we convert utctz to the timezone in seconds, it is the format time.altzone
- returns. Git stores it as UTC timezone which has the opposite sign as well,
- which explains the -1 * ( that was made explicit here )
- :param utctz: git utc timezone string, i.e. +0200"""
- return -1 * int(float(utctz)/100*3600)
-
-def altz_to_utctz_str(altz):
- """As above, but inverses the operation, returning a string that can be used
- in commit objects"""
- utci = -1 * int((altz / 3600)*100)
- utcs = str(abs(utci))
- utcs = "0"*(4-len(utcs)) + utcs
- prefix = (utci < 0 and '-') or '+'
- return prefix + utcs
-
-
-def verify_utctz(offset):
- """:raise ValueError: if offset is incorrect
- :return: offset"""
- fmt_exc = ValueError("Invalid timezone offset format: %s" % offset)
- if len(offset) != 5:
- raise fmt_exc
- if offset[0] not in "+-":
- raise fmt_exc
- if offset[1] not in digits or \
- offset[2] not in digits or \
- offset[3] not in digits or \
- offset[4] not in digits:
- raise fmt_exc
- # END for each char
- return offset
-
-def parse_date(string_date):
- """
- Parse the given date as one of the following
-
- * Git internal format: timestamp offset
- * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200.
- * ISO 8601 2005-04-07T22:13:13
- The T can be a space as well
-
- :return: Tuple(int(timestamp), int(offset)), both in seconds since epoch
- :raise ValueError: If the format could not be understood
- :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY"""
- # git time
- try:
- if string_date.count(' ') == 1 and string_date.rfind(':') == -1:
- timestamp, offset = string_date.split()
- timestamp = int(timestamp)
- return timestamp, utctz_to_altz(verify_utctz(offset))
- else:
- offset = "+0000" # local time by default
- if string_date[-5] in '-+':
- offset = verify_utctz(string_date[-5:])
- string_date = string_date[:-6] # skip space as well
- # END split timezone info
-
- # now figure out the date and time portion - split time
- date_formats = list()
- splitter = -1
- if ',' in string_date:
- date_formats.append("%a, %d %b %Y")
- splitter = string_date.rfind(' ')
- else:
- # iso plus additional
- date_formats.append("%Y-%m-%d")
- date_formats.append("%Y.%m.%d")
- date_formats.append("%m/%d/%Y")
- date_formats.append("%d.%m.%Y")
-
- splitter = string_date.rfind('T')
- if splitter == -1:
- splitter = string_date.rfind(' ')
- # END handle 'T' and ' '
- # END handle rfc or iso
-
- assert splitter > -1
-
- # split date and time
- time_part = string_date[splitter+1:] # skip space
- date_part = string_date[:splitter]
-
- # parse time
- tstruct = time.strptime(time_part, "%H:%M:%S")
-
- for fmt in date_formats:
- try:
- dtstruct = time.strptime(date_part, fmt)
- fstruct = time.struct_time((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday,
- tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec,
- dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst))
- return int(time.mktime(fstruct)), utctz_to_altz(offset)
- except ValueError:
- continue
- # END exception handling
- # END for each fmt
-
- # still here ? fail
- raise ValueError("no format matched")
- # END handle format
- except Exception:
- raise ValueError("Unsupported date format: %s" % string_date)
- # END handle exceptions
-
-
-# precompiled regex
-_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$')
-
-def parse_actor_and_date(line):
- """Parse out the actor (author or committer) info from a line like::
-
- author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700
-
- :return: [Actor, int_seconds_since_epoch, int_timezone_offset]"""
- m = _re_actor_epoch.search(line)
- actor, epoch, offset = m.groups()
- return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset))
-
-
-#} END functions
-
-
-#{ Classes
-
-class ProcessStreamAdapter(object):
- """Class wireing all calls to the contained Process instance.
-
- Use this type to hide the underlying process to provide access only to a specified
- stream. The process is usually wrapped into an AutoInterrupt class to kill
- it if the instance goes out of scope."""
- __slots__ = ("_proc", "_stream")
- def __init__(self, process, stream_name):
- self._proc = process
- self._stream = getattr(process, stream_name)
-
- def __getattr__(self, attr):
- return getattr(self._stream, attr)
-
-
-class Traversable(object):
- """Simple interface to perforam depth-first or breadth-first traversals
- into one direction.
- Subclasses only need to implement one function.
- Instances of the Subclass must be hashable"""
- __slots__ = tuple()
-
- @classmethod
- def _get_intermediate_items(cls, item):
- """
- Returns:
- List of items connected to the given item.
- Must be implemented in subclass
- """
- raise NotImplementedError("To be implemented in subclass")
-
- def list_traverse(self, *args, **kwargs):
- """
- :return: IterableList with the results of the traversal as produced by
- traverse()"""
- out = IterableList(self._id_attribute_)
- out.extend(self.traverse(*args, **kwargs))
- return out
-
- def traverse( self, predicate = lambda i,d: True,
- prune = lambda i,d: False, depth = -1, branch_first=True,
- visit_once = True, ignore_self=1, as_edge = False ):
- """:return: iterator yieling of items found when traversing self
-
- :param predicate: f(i,d) returns False if item i at depth d should not be included in the result
-
- :param prune:
- f(i,d) return True if the search should stop at item i at depth d.
- Item i will not be returned.
-
- :param depth:
- define at which level the iteration should not go deeper
- if -1, there is no limit
- if 0, you would effectively only get self, the root of the iteration
- i.e. if 1, you would only get the first level of predessessors/successors
-
- :param branch_first:
- if True, items will be returned branch first, otherwise depth first
-
- :param visit_once:
- if True, items will only be returned once, although they might be encountered
- several times. Loops are prevented that way.
-
- :param ignore_self:
- if True, self will be ignored and automatically pruned from
- the result. Otherwise it will be the first item to be returned.
- If as_edge is True, the source of the first edge is None
-
- :param as_edge:
- if True, return a pair of items, first being the source, second the
- destinatination, i.e. tuple(src, dest) with the edge spanning from
- source to destination"""
- visited = set()
- stack = Deque()
- stack.append( ( 0 ,self, None ) ) # self is always depth level 0
-
- def addToStack( stack, item, branch_first, depth ):
- lst = self._get_intermediate_items( item )
- if not lst:
- return
- if branch_first:
- stack.extendleft( ( depth , i, item ) for i in lst )
- else:
- reviter = ( ( depth , lst[i], item ) for i in range( len( lst )-1,-1,-1) )
- stack.extend( reviter )
- # END addToStack local method
-
- while stack:
- d, item, src = stack.pop() # depth of item, item, item_source
-
- if visit_once and item in visited:
- continue
-
- if visit_once:
- visited.add(item)
-
- rval = ( as_edge and (src, item) ) or item
- if prune( rval, d ):
- continue
-
- skipStartItem = ignore_self and ( item is self )
- if not skipStartItem and predicate( rval, d ):
- yield rval
-
- # only continue to next level if this is appropriate !
- nd = d + 1
- if depth > -1 and nd > depth:
- continue
-
- addToStack( stack, item, branch_first, nd )
- # END for each item on work stack
-
-
-class Serializable(object):
- """Defines methods to serialize and deserialize objects from and into a data stream"""
- __slots__ = tuple()
-
- def _serialize(self, stream):
- """Serialize the data of this object into the given data stream
- :note: a serialized object would ``_deserialize`` into the same objet
- :param stream: a file-like object
- :return: self"""
- raise NotImplementedError("To be implemented in subclass")
-
- def _deserialize(self, stream):
- """Deserialize all information regarding this object from the stream
- :param stream: a file-like object
- :return: self"""
- raise NotImplementedError("To be implemented in subclass")
diff --git a/gitdb/ref/head.py b/gitdb/ref/head.py
index 4103887..321b81b 100644
--- a/gitdb/ref/head.py
+++ b/gitdb/ref/head.py
@@ -1,22 +1,9 @@
-from symbolic import SymbolicReference
+
__all__ = ["HEAD"]
class HEAD(SymbolicReference):
"""Special case of a Symbolic Reference as it represents the repository's
HEAD reference."""
- _HEAD_NAME = 'HEAD'
- _ORIG_HEAD_NAME = 'ORIG_HEAD'
- __slots__ = tuple()
-
- def __init__(self, repo, path=_HEAD_NAME):
- if path != self._HEAD_NAME:
- raise ValueError("HEAD instance must point to %r, got %r" % (self._HEAD_NAME, path))
- super(HEAD, self).__init__(repo, path)
- def orig_head(self):
- """
- :return: SymbolicReference pointing at the ORIG_HEAD, which is maintained
- to contain the previous value of HEAD"""
- return SymbolicReference(self.repo, self._ORIG_HEAD_NAME)
diff --git a/gitdb/ref/headref.py b/gitdb/ref/headref.py
index 798f651..8b13789 100644
--- a/gitdb/ref/headref.py
+++ b/gitdb/ref/headref.py
@@ -1,84 +1 @@
-from reference import Reference
-from gitdb.config import SectionConstraint
-from gitdb.util import join_path
-
-__all__ = ["Head"]
-
-class Head(Reference):
- """A Head is a named reference to a Commit"""
- _common_path_default = "refs/heads"
- k_config_remote = "remote"
- k_config_remote_ref = "merge" # branch to merge from remote
-
- # will be set by init method !
- RemoteReferenceCls = None
-
- #{ Configuration
-
- def set_tracking_branch(self, remote_reference):
- """
- Configure this branch to track the given remote reference. This will alter
- this branch's configuration accordingly.
-
- :param remote_reference: The remote reference to track or None to untrack
- any references
- :return: self"""
- if remote_reference is not None and not isinstance(remote_reference, self.RemoteReferenceCls):
- raise ValueError("Incorrect parameter type: %r" % remote_reference)
- # END handle type
-
- writer = self.config_writer()
- if remote_reference is None:
- writer.remove_option(self.k_config_remote)
- writer.remove_option(self.k_config_remote_ref)
- if len(writer.options()) == 0:
- writer.remove_section()
- # END handle remove section
- else:
- writer.set_value(self.k_config_remote, remote_reference.remote_name)
- writer.set_value(self.k_config_remote_ref, Head.to_full_path(remote_reference.remote_head))
- # END handle ref value
-
- return self
-
- def tracking_branch(self):
- """
- :return: The remote_reference we are tracking, or None if we are
- not a tracking branch"""
- reader = self.config_reader()
- if reader.has_option(self.k_config_remote) and reader.has_option(self.k_config_remote_ref):
- ref = Head(self.repo, Head.to_full_path(reader.get_value(self.k_config_remote_ref)))
- remote_refpath = self.RemoteReferenceCls.to_full_path(join_path(reader.get_value(self.k_config_remote), ref.name))
- return self.RemoteReferenceCls(self.repo, remote_refpath)
- # END handle have tracking branch
-
- # we are not a tracking branch
- return None
-
-
- #{ Configruation
-
- def _config_parser(self, read_only):
- if read_only:
- parser = self.repo.config_reader()
- else:
- parser = self.repo.config_writer()
- # END handle parser instance
-
- return SectionConstraint(parser, 'branch "%s"' % self.name)
-
- def config_reader(self):
- """
- :return: A configuration parser instance constrained to only read
- this instance's values"""
- return self._config_parser(read_only=True)
-
- def config_writer(self):
- """
- :return: A configuration writer instance with read-and write acccess
- to options of this head"""
- return self._config_parser(read_only=False)
-
- #} END configuration
-
diff --git a/gitdb/ref/log.py b/gitdb/ref/log.py
index 3aa0b4a..8b13789 100644
--- a/gitdb/ref/log.py
+++ b/gitdb/ref/log.py
@@ -1,279 +1 @@
-from gitdb.util import (
- join_path,
- Actor,
- LockedFD,
- LockFile,
- assure_directory_exists,
- to_native_path,
- bin_to_hex,
- join,
- file_contents_ro_filepath
- )
-from gitdb.object.util import (
- parse_date,
- Serializable,
- utctz_to_altz,
- altz_to_utctz_str,
- )
-
-import time
-import os
-import re
-
-__all__ = ["RefLog", "RefLogEntry"]
-
-
-class RefLogEntry(tuple):
- """Named tuple allowing easy access to the revlog data fields"""
- _fmt = "%s %s %s <%s> %i %s\t%s\n"
- _re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$')
- __slots__ = tuple()
-
- def __repr__(self):
- """Representation of ourselves in git reflog format"""
- act = self.actor
- time = self.time
- return self._fmt % (self.oldhexsha, self.newhexsha, act.name, act.email,
- time[0], altz_to_utctz_str(time[1]), self.message)
-
- @property
- def oldhexsha(self):
- """The hexsha to the commit the ref pointed to before the change"""
- return self[0]
-
- @property
- def newhexsha(self):
- """The hexsha to the commit the ref now points to, after the change"""
- return self[1]
-
- @property
- def actor(self):
- """Actor instance, providing access"""
- return self[2]
-
- @property
- def time(self):
- """time as tuple:
-
- * [0] = int(time)
- * [1] = int(timezone_offset) in time.altzone format """
- return self[3]
-
- @property
- def message(self):
- """Message describing the operation that acted on the reference"""
- return self[4]
-
- @classmethod
- def new(self, oldhexsha, newhexsha, actor, time, tz_offset, message):
- """:return: New instance of a RefLogEntry"""
- if not isinstance(actor, Actor):
- raise ValueError("Need actor instance, got %s" % actor)
- # END check types
- return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message))
-
- @classmethod
- def from_line(cls, line):
- """:return: New RefLogEntry instance from the given revlog line.
- :param line: line without trailing newline
- :raise ValueError: If line could not be parsed"""
- try:
- info, msg = line.split('\t', 2)
- except ValueError:
- raise ValueError("line is missing tab separator")
- #END handle first plit
- oldhexsha = info[:40]
- newhexsha = info[41:81]
- for hexsha in (oldhexsha, newhexsha):
- if not cls._re_hexsha_only.match(hexsha):
- raise ValueError("Invalid hexsha: %s" % hexsha)
- # END if hexsha re doesn't match
- #END for each hexsha
-
- email_end = info.find('>', 82)
- if email_end == -1:
- raise ValueError("Missing token: >")
- #END handle missing end brace
-
- actor = Actor._from_string(info[82:email_end+1])
- time, tz_offset = parse_date(info[email_end+2:])
-
- return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg))
-
-
-class RefLog(list, Serializable):
- """A reflog contains reflog entries, each of which defines a certain state
- of the head in question. Custom query methods allow to retrieve log entries
- by date or by other criteria.
-
- Reflog entries are orded, the first added entry is first in the list, the last
- entry, i.e. the last change of the head or reference, is last in the list."""
-
- __slots__ = ('_path', )
-
- def __new__(cls, filepath=None):
- inst = super(RefLog, cls).__new__(cls)
- return inst
-
- def __init__(self, filepath=None):
- """Initialize this instance with an optional filepath, from which we will
- initialize our data. The path is also used to write changes back using
- the write() method"""
- self._path = filepath
- if filepath is not None:
- self._read_from_file()
- # END handle filepath
-
- def _read_from_file(self):
- fmap = file_contents_ro_filepath(self._path, stream=False, allow_mmap=True)
- try:
- self._deserialize(fmap)
- finally:
- fmap.close()
- #END handle closing of handle
-
- #{ Interface
-
- @classmethod
- def from_file(cls, filepath):
- """
- :return: a new RefLog instance containing all entries from the reflog
- at the given filepath
- :param filepath: path to reflog
- :raise ValueError: If the file could not be read or was corrupted in some way"""
- return cls(filepath)
-
- @classmethod
- def path(cls, ref):
- """
- :return: string to absolute path at which the reflog of the given ref
- instance would be found. The path is not guaranteed to point to a valid
- file though.
- :param ref: SymbolicReference instance"""
- return join(ref.repo.git_path(), "logs", to_native_path(ref.path))
-
- @classmethod
- def iter_entries(cls, stream):
- """
- :return: Iterator yielding RefLogEntry instances, one for each line read
- sfrom the given stream.
- :param stream: file-like object containing the revlog in its native format
- or basestring instance pointing to a file to read"""
- new_entry = RefLogEntry.from_line
- if isinstance(stream, basestring):
- stream = file_contents_ro_filepath(stream)
- #END handle stream type
- while True:
- line = stream.readline()
- if not line:
- return
- yield new_entry(line.strip())
- #END endless loop
-
- @classmethod
- def entry_at(cls, filepath, index):
- """:return: RefLogEntry at the given index
- :param filepath: full path to the index file from which to read the entry
- :param index: python list compatible index, i.e. it may be negative to
- specifiy an entry counted from the end of the list
-
- :raise IndexError: If the entry didn't exist
-
- .. note:: This method is faster as it only parses the entry at index, skipping
- all other lines. Nonetheless, the whole file has to be read if
- the index is negative
- """
- fp = open(filepath, 'rb')
- if index < 0:
- return RefLogEntry.from_line(fp.readlines()[index].strip())
- else:
- # read until index is reached
- for i in xrange(index+1):
- line = fp.readline()
- if not line:
- break
- #END abort on eof
- #END handle runup
-
- if i != index or not line:
- raise IndexError
- #END handle exception
-
- return RefLogEntry.from_line(line.strip())
- #END handle index
-
- def to_file(self, filepath):
- """Write the contents of the reflog instance to a file at the given filepath.
- :param filepath: path to file, parent directories are assumed to exist"""
- lfd = LockedFD(filepath)
- assure_directory_exists(filepath, is_file=True)
-
- fp = lfd.open(write=True, stream=True)
- try:
- self._serialize(fp)
- lfd.commit()
- except:
- # on failure it rolls back automatically, but we make it clear
- lfd.rollback()
- raise
- #END handle change
-
- @classmethod
- def append_entry(cls, config_reader, filepath, oldbinsha, newbinsha, message):
- """Append a new log entry to the revlog at filepath.
-
- :param config_reader: configuration reader of the repository - used to obtain
- user information. May be None
- :param filepath: full path to the log file
- :param oldbinsha: binary sha of the previous commit
- :param newbinsha: binary sha of the current commit
- :param message: message describing the change to the reference
- :param write: If True, the changes will be written right away. Otherwise
- the change will not be written
- :return: RefLogEntry objects which was appended to the log
- :note: As we are append-only, concurrent access is not a problem as we
- do not interfere with readers."""
- if len(oldbinsha) != 20 or len(newbinsha) != 20:
- raise ValueError("Shas need to be given in binary format")
- #END handle sha type
- assure_directory_exists(filepath, is_file=True)
- entry = RefLogEntry((bin_to_hex(oldbinsha), bin_to_hex(newbinsha), Actor.committer(config_reader), (int(time.time()), time.altzone), message))
-
- lf = LockFile(filepath)
- lf._obtain_lock_or_raise()
-
- fd = open(filepath, 'a')
- try:
- fd.write(repr(entry))
- finally:
- fd.close()
- lf._release_lock()
- #END handle write operation
-
- return entry
-
- def write(self):
- """Write this instance's data to the file we are originating from
- :return: self"""
- if self._path is None:
- raise ValueError("Instance was not initialized with a path, use to_file(...) instead")
- #END assert path
- self.to_file(self._path)
- return self
-
- #} END interface
-
- #{ Serializable Interface
- def _serialize(self, stream):
- lm1 = len(self) - 1
- write = stream.write
-
- # write all entries
- for e in self:
- write(repr(e))
- #END for each entry
-
- def _deserialize(self, stream):
- self.extend(self.iter_entries(stream))
- #} END serializable interface
diff --git a/gitdb/ref/reference.py b/gitdb/ref/reference.py
index 6373cb3..8b13789 100644
--- a/gitdb/ref/reference.py
+++ b/gitdb/ref/reference.py
@@ -1,82 +1 @@
-import os
-from symbolic import SymbolicReference
-from head import HEAD
-from gitdb.util import (
- LazyMixin,
- Iterable,
- isfile,
- hex_to_bin
- )
-
-__all__ = ["Reference"]
-
-
-class Reference(SymbolicReference, LazyMixin, Iterable):
- """Represents a named reference to any object. Subclasses may apply restrictions though,
- i.e. Heads can only point to commits."""
- __slots__ = tuple()
- _points_to_commits_only = False
- _resolve_ref_on_create = True
- _common_path_default = "refs"
-
- def __init__(self, repo, path):
- """Initialize this instance
- :param repo: Our parent repository
-
- :param path:
- Path relative to the .git/ directory pointing to the ref in question, i.e.
- refs/heads/master"""
- if not path.startswith(self._common_path_default+'/'):
- raise ValueError("Cannot instantiate %r from path %s" % ( self.__class__.__name__, path ))
- super(Reference, self).__init__(repo, path)
-
-
- def __str__(self):
- return self.name
-
- def set_object(self, object, logmsg = None):
- """Special version which checks if the head-log needs an update as well"""
- oldbinsha = None
- head = HEAD(self.repo)
- if logmsg is not None:
- if not head.is_detached and head.ref == self:
- oldbinsha = self.commit.binsha
- #END handle commit retrieval
- #END handle message is set
-
- super(Reference, self).set_object(object, logmsg)
-
- if oldbinsha is not None:
- # /* from refs.c in git-source
- # * Special hack: If a branch is updated directly and HEAD
- # * points to it (may happen on the remote side of a push
- # * for example) then logically the HEAD reflog should be
- # * updated too.
- # * A generic solution implies reverse symref information,
- # * but finding all symrefs pointing to the given branch
- # * would be rather costly for this rare event (the direct
- # * update of a branch) to be worth it. So let's cheat and
- # * check with HEAD only which should cover 99% of all usage
- # * scenarios (even 100% of the default ones).
- # */
- head.log_append(oldbinsha, logmsg)
- #END check if the head
-
- # NOTE: Don't have to overwrite properties as the will only work without a the log
-
- @property
- def name(self):
- """:return: (shortest) Name of this reference - it may contain path components"""
- # first two path tokens are can be removed as they are
- # refs/heads or refs/tags or refs/remotes
- tokens = self.path.split('/')
- if len(tokens) < 3:
- return self.path # could be refs/HEAD
- return '/'.join(tokens[2:])
-
- @classmethod
- def iter_items(cls, repo, common_path = None):
- """Equivalent to SymbolicReference.iter_items, but will return non-detached
- references as well."""
- return cls._iter_items(repo, common_path)
diff --git a/gitdb/ref/remote.py b/gitdb/ref/remote.py
index 625234d..8b13789 100644
--- a/gitdb/ref/remote.py
+++ b/gitdb/ref/remote.py
@@ -1,47 +1 @@
-from headref import Head
-from gitdb.util import (
- join,
- join_path
- )
-
-
-__all__ = ["RemoteReference"]
-
-
-class RemoteReference(Head):
- """Represents a reference pointing to a remote head."""
- _common_path_default = "refs/remotes"
-
-
- @classmethod
- def iter_items(cls, repo, common_path = None, remote=None):
- """Iterate remote references, and if given, constrain them to the given remote"""
- common_path = common_path or cls._common_path_default
- if remote is not None:
- common_path = join_path(common_path, str(remote))
- # END handle remote constraint
- return super(RemoteReference, cls).iter_items(repo, common_path)
-
- @property
- def remote_name(self):
- """
- :return:
- Name of the remote we are a reference of, such as 'origin' for a reference
- named 'origin/master'"""
- tokens = self.path.split('/')
- # /refs/remotes/<remote name>/<branch_name>
- return tokens[2]
-
- @property
- def remote_head(self):
- """:return: Name of the remote head itself, i.e. master.
- :note: The returned name is usually not qualified enough to uniquely identify
- a branch"""
- tokens = self.path.split('/')
- return '/'.join(tokens[3:])
-
- @classmethod
- def create(cls, *args, **kwargs):
- """Used to disable this method"""
- raise TypeError("Cannot explicitly create remote references")
diff --git a/gitdb/ref/symbolic.py b/gitdb/ref/symbolic.py
index 70cfc7d..8b13789 100644
--- a/gitdb/ref/symbolic.py
+++ b/gitdb/ref/symbolic.py
@@ -1,653 +1 @@
-import os
-import re
-from gitdb.object import (
- Object,
- Commit
- )
-from gitdb.util import (
- join_path,
- join_path_native,
- to_native_path_linux,
- assure_directory_exists,
- join,
- dirname,
- isdir,
- exists,
- isfile,
- rename,
- hex_to_bin,
- LockedFD
- )
-
-from gitdb.exc import BadObject
-from log import RefLog
-
-__all__ = ["SymbolicReference"]
-
-class SymbolicReference(object):
- """Represents a special case of a reference such that this reference is symbolic.
- It does not point to a specific commit, but to another Head, which itself
- specifies a commit.
-
- A typical example for a symbolic reference is HEAD."""
- __slots__ = ("repo", "path")
-
- _resolve_ref_on_create = False
- _points_to_commits_only = True
- _common_path_default = ""
- _id_attribute_ = "name"
-
- re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$')
-
- #{ Configuration
- # Object class to be used when instantiating objects
- ObjectCls = Object
- CommitCls = Commit
-
- # all of the following are set by the package initializer
- HEADCls = None
- HeadCls = None
- RemoteReferenceCls = None
- TagReferenceCls = None
- ReferenceCls = None
- #}END configuration
-
- def __init__(self, repo, path):
- self.repo = repo
- self.path = path
-
- def __str__(self):
- return self.path
-
- def __repr__(self):
- return '<git.%s "%s">' % (self.__class__.__name__, self.path)
-
- def __eq__(self, other):
- return self.path == other.path
-
- def __ne__(self, other):
- return not ( self == other )
-
- def __hash__(self):
- return hash(self.path)
-
- @property
- def name(self):
- """
- :return:
- In case of symbolic references, the shortest assumable name
- is the path itself."""
- return self.path
-
- @property
- def abspath(self):
- return join_path_native(self.repo.git_path(), self.path)
-
- @classmethod
- def _get_packed_refs_path(cls, repo):
- return join(repo.git_path(), 'packed-refs')
-
- @classmethod
- def _iter_packed_refs(cls, repo):
- """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs.
- :note: The packed refs file will be kept open as long as we iterate"""
- try:
- fp = open(cls._get_packed_refs_path(repo), 'r')
- for line in fp:
- line = line.strip()
- if not line:
- continue
- if line.startswith('#'):
- if line.startswith('# pack-refs with:') and not line.endswith('peeled'):
- raise TypeError("PackingType of packed-Refs not understood: %r" % line)
- # END abort if we do not understand the packing scheme
- continue
- # END parse comment
-
- # skip dereferenced tag object entries - previous line was actual
- # tag reference for it
- if line[0] == '^':
- continue
-
- yield tuple(line.split(' ', 1))
- # END for each line
- except (OSError,IOError):
- raise StopIteration
- # END no packed-refs file handling
- # NOTE: Had try-finally block around here to close the fp,
- # but some python version woudn't allow yields within that.
- # I believe files are closing themselves on destruction, so it is
- # alright.
-
- @classmethod
- def dereference_recursive(cls, repo, ref_path):
- """
- :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all
- intermediate references as required
- :param repo: the repository containing the reference at ref_path"""
- while True:
- hexsha, ref_path = cls._get_ref_info(repo, ref_path)
- if hexsha is not None:
- return hexsha
- # END recursive dereferencing
-
- @classmethod
- def _get_ref_info(cls, repo, ref_path):
- """Return: (sha, target_ref_path) if available, the sha the file at
- rela_path points to, or None. target_ref_path is the reference we
- point to, or None"""
- tokens = None
- try:
- fp = open(join(repo.git_path(), ref_path), 'r')
- value = fp.read().rstrip()
- fp.close()
- tokens = value.split(" ")
- except (OSError,IOError):
- # Probably we are just packed, find our entry in the packed refs file
- # NOTE: We are not a symbolic ref if we are in a packed file, as these
- # are excluded explictly
- for sha, path in cls._iter_packed_refs(repo):
- if path != ref_path: continue
- tokens = (sha, path)
- break
- # END for each packed ref
- # END handle packed refs
- if tokens is None:
- raise ValueError("Reference at %r does not exist" % ref_path)
-
- # is it a reference ?
- if tokens[0] == 'ref:':
- return (None, tokens[1])
-
- # its a commit
- if cls.re_hexsha_only.match(tokens[0]):
- return (tokens[0], None)
-
- raise ValueError("Failed to parse reference information from %r" % ref_path)
-
- def _get_object_sha(self):
- """
- :return:
- The binary sha to the object our ref currently refers to. Refs can be cached, they will
- always point to the actual object as it gets re-created on each query"""
- return hex_to_bin(self.dereference_recursive(self.repo, self.path))
-
- def _get_object(self):
- """
- :return:
- The object our ref currently refers to."""
- # have to be dynamic here as we may be a tag which can point to anything
- # Our path will be resolved to the hexsha which will be used accordingly
- return self.ObjectCls.new_from_sha(self.repo, self._get_object_sha())
-
- def set_object(self, object_id, logmsg = None):
- """Set the object we point to, possibly dereference our symbolic reference first.
- If the reference does not exist, it will be created
-
- :param object: a reference specifier string, a SymbolicReference or an object hex sha.
- SymbolicReferences will be dereferenced beforehand to obtain the object they point to
- :param logmsg: If not None, the message will be used in the reflog entry to be
- written. Otherwise the reflog is not altered
- :note: plain SymbolicReferences may not actually point to objects by convention
- :return: self"""
- if isinstance(object_id, SymbolicReference):
- object = object.object
- #END resolve references
-
- is_detached = True
- try:
- is_detached = self.is_detached
- except ValueError:
- pass
- # END handle non-existing ones
-
- if is_detached:
- return self.set_reference(object_id, logmsg)
-
- # set the commit on our reference
- return self._get_reference().set_object(object_id, logmsg)
-
- def _get_commit(self):
- """
- :return:
- Commit object we point to, works for detached and non-detached
- SymbolicReferences. The symbolic reference will be dereferenced recursively."""
- obj = self._get_object()
- if obj.type == 'tag':
- obj = obj.object
- #END dereference tag
-
- if obj.type != self.CommitCls.type:
- raise TypeError("Symbolic Reference pointed to object %r, commit was required" % obj)
- #END handle type
- return obj
-
- def set_commit(self, commit, logmsg = None):
- """As set_object, but restricts the type of object to be a Commit
-
- :raise ValueError: If commit is not a Commit object or doesn't point to
- a commit
- :return: self"""
- # check the type - assume the best if it is a base-string
- is_invalid_type = False
- if isinstance(commit, self.ObjectCls):
- is_invalid_type = commit.type != self.CommitCls.type
- elif isinstance(commit, SymbolicReference):
- is_invalid_type = commit.object.type != self.CommitCls.type
- else:
- try:
- is_invalid_type = self.repo.resolve(commit).type != self.CommitCls.type
- except BadObject:
- raise ValueError("Invalid object: %s" % commit)
- #END handle exception
- # END verify type
-
- if is_invalid_type:
- raise ValueError("Need commit, got %r" % commit)
- #END handle raise
-
- # we leave strings to the rev-parse method below
- self.set_object(commit, logmsg)
-
- return self
-
-
- commit = property(_get_commit, set_commit, doc="Query or set commits directly")
- object = property(_get_object, set_object, doc="Return the object our ref currently refers to")
- object_binsha = property(_get_object_sha, set_object, doc="Return the object our ref currently refers to")
-
- def _get_reference(self):
- """:return: Reference Object we point to
- :raise TypeError: If this symbolic reference is detached, hence it doesn't point
- to a reference, but to a commit"""
- sha, target_ref_path = self._get_ref_info(self.repo, self.path)
- if target_ref_path is None:
- raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha))
- return self.from_path(self.repo, target_ref_path)
-
- def set_reference(self, ref, logmsg = None):
- """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference.
- Otherwise an Object, given as Object instance or refspec, is assumed and if valid,
- will be set which effectively detaches the refererence if it was a purely
- symbolic one.
-
- :param ref: SymbolicReference instance, hexadecimal sha string or refspec string
- Only if the ref is a SymbolicRef instance, we will point to it. Everthiny
- else is dereferenced to obtain the actual object.
- :param logmsg: If set to a string, the message will be used in the reflog.
- Otherwise, a reflog entry is not written for the changed reference.
- The previous commit of the entry will be the commit we point to now.
-
- See also: log_append()
-
- :return: self
- :note: This symbolic reference will not be dereferenced. For that, see
- ``set_object(...)``"""
- write_value = None
- obj = None
- if isinstance(ref, SymbolicReference):
- write_value = "ref: %s" % ref.path
- elif isinstance(ref, self.ObjectCls):
- obj = ref
- write_value = ref.hexsha
- elif isinstance(ref, basestring):
- try:
- obj = self.repo.resolve(ref+"^{}") # optionally deref tags
- write_value = obj.hexsha
- except BadObject:
- raise ValueError("Could not extract object from %s" % ref)
- # END end try string
- else:
- raise ValueError("Unrecognized Value: %r" % ref)
- # END try commit attribute
-
- # typecheck
- if obj is not None and self._points_to_commits_only and obj.type != Commit.type:
- raise TypeError("Require commit, got %r" % obj)
- #END verify type
-
- oldbinsha = None
- if logmsg is not None:
- try:
- oldbinsha = self.commit.binsha
- except ValueError:
- oldbinsha = Commit.NULL_BIN_SHA
- #END handle non-existing
- #END retrieve old hexsha
-
- fpath = self.abspath
- assure_directory_exists(fpath, is_file=True)
-
- lfd = LockedFD(fpath)
- fd = lfd.open(write=True, stream=True)
- fd.write(write_value)
- lfd.commit()
-
- # Adjust the reflog
- if logmsg is not None:
- self.log_append(oldbinsha, logmsg)
- #END handle reflog
-
- return self
-
-
- # aliased reference
- reference = property(_get_reference, set_reference, doc="Returns the Reference we point to")
- ref = reference
-
- def is_valid(self):
- """
- :return:
- True if the reference is valid, hence it can be read and points to
- a valid object or reference."""
- try:
- self.object
- except (OSError, ValueError, BadObject):
- return False
- else:
- return True
-
- @property
- def is_detached(self):
- """
- :return:
- True if we are a detached reference, hence we point to a specific commit
- instead to another reference"""
- try:
- self.ref
- return False
- except TypeError:
- return True
-
- def log(self):
- """
- :return: RefLog for this reference. Its last entry reflects the latest change
- applied to this reference
-
- .. note:: As the log is parsed every time, its recommended to cache it for use
- instead of calling this method repeatedly. It should be considered read-only."""
- return RefLog.from_file(RefLog.path(self))
-
- def log_append(self, oldbinsha, message, newbinsha=None):
- """Append a logentry to the logfile of this ref
-
- :param oldbinsha: binary sha this ref used to point to
- :param message: A message describing the change
- :param newbinsha: The sha the ref points to now. If None, our current commit sha
- will be used
- :return: added RefLogEntry instance"""
- return RefLog.append_entry(self.repo.config_reader(), RefLog.path(self), oldbinsha,
- (newbinsha is None and self.commit.binsha) or newbinsha,
- message)
-
- def log_entry(self, index):
- """:return: RefLogEntry at the given index
- :param index: python list compatible positive or negative index
-
- .. note:: This method must read part of the reflog during execution, hence
- it should be used sparringly, or only if you need just one index.
- In that case, it will be faster than the ``log()`` method"""
- return RefLog.entry_at(RefLog.path(self), index)
-
- @classmethod
- def to_full_path(cls, path):
- """
- :return: string with a full repository-relative path which can be used to initialize
- a Reference instance, for instance by using ``Reference.from_path``"""
- if isinstance(path, SymbolicReference):
- path = path.path
- full_ref_path = path
- if not cls._common_path_default:
- return full_ref_path
- if not path.startswith(cls._common_path_default+"/"):
- full_ref_path = '%s/%s' % (cls._common_path_default, path)
- return full_ref_path
-
- @classmethod
- def delete(cls, repo, path):
- """Delete the reference at the given path
-
- :param repo:
- Repository to delete the reference from
-
- :param path:
- Short or full path pointing to the reference, i.e. refs/myreference
- or just "myreference", hence 'refs/' is implied.
- Alternatively the symbolic reference to be deleted"""
- full_ref_path = cls.to_full_path(path)
- abs_path = join(repo.git_path(), full_ref_path)
- if exists(abs_path):
- os.remove(abs_path)
- else:
- # check packed refs
- pack_file_path = cls._get_packed_refs_path(repo)
- try:
- reader = open(pack_file_path)
- except (OSError,IOError):
- pass # it didnt exist at all
- else:
- new_lines = list()
- made_change = False
- dropped_last_line = False
- for line in reader:
- # keep line if it is a comment or if the ref to delete is not
- # in the line
- # If we deleted the last line and this one is a tag-reference object,
- # we drop it as well
- if ( line.startswith('#') or full_ref_path not in line ) and \
- ( not dropped_last_line or dropped_last_line and not line.startswith('^') ):
- new_lines.append(line)
- dropped_last_line = False
- continue
- # END skip comments and lines without our path
-
- # drop this line
- made_change = True
- dropped_last_line = True
- # END for each line in packed refs
- reader.close()
-
- # write the new lines
- if made_change:
- open(pack_file_path, 'w').writelines(new_lines)
- # END open exception handling
- # END handle deletion
-
- # delete the reflog
- reflog_path = RefLog.path(cls(repo, full_ref_path))
- if os.path.isfile(reflog_path):
- os.remove(reflog_path)
- #END remove reflog
-
-
- @classmethod
- def _create(cls, repo, path, resolve, reference, force, logmsg=None):
- """internal method used to create a new symbolic reference.
- If resolve is False, the reference will be taken as is, creating
- a proper symbolic reference. Otherwise it will be resolved to the
- corresponding object and a detached symbolic reference will be created
- instead"""
- full_ref_path = cls.to_full_path(path)
- abs_ref_path = join(repo.git_path(), full_ref_path)
-
- # figure out target data
- target = reference
- if resolve:
- # could just use the resolve method, but it could be expensive
- # so we handle most common cases ourselves
- if isinstance(reference, cls.ObjectCls):
- target = reference.hexsha
- elif isinstance(reference, SymbolicReference):
- target = reference.object.hexsha
- else:
- target = repo.resolve(str(reference))
- #END handle resoltion
- #END need resolution
-
- if not force and isfile(abs_ref_path):
- target_data = str(target)
- if isinstance(target, SymbolicReference):
- target_data = target.path
- if not resolve:
- target_data = "ref: " + target_data
- existing_data = open(abs_ref_path, 'rb').read().strip()
- if existing_data != target_data:
- raise OSError("Reference at %r does already exist, pointing to %r, requested was %r" % (full_ref_path, existing_data, target_data))
- # END no force handling
-
- ref = cls(repo, full_ref_path)
- ref.set_reference(target, logmsg)
- return ref
-
- @classmethod
- def create(cls, repo, path, reference='HEAD', force=False, logmsg=None):
- """Create a new symbolic reference, hence a reference pointing to another reference.
-
- :param repo:
- Repository to create the reference in
-
- :param path:
- full path at which the new symbolic reference is supposed to be
- created at, i.e. "NEW_HEAD" or "symrefs/my_new_symref"
-
- :param reference:
- The reference to which the new symbolic reference should point to.
- If it is a commit'ish, the symbolic ref will be detached.
-
- :param force:
- if True, force creation even if a symbolic reference with that name already exists.
- Raise OSError otherwise
-
- :param logmsg:
- If not None, the message to append to the reflog. Otherwise no reflog
- entry is written.
-
- :return: Newly created symbolic Reference
-
- :raise OSError:
- If a (Symbolic)Reference with the same name but different contents
- already exists.
-
- :note: This does not alter the current HEAD, index or Working Tree"""
- return cls._create(repo, path, cls._resolve_ref_on_create, reference, force, logmsg)
-
- def rename(self, new_path, force=False):
- """Rename self to a new path
-
- :param new_path:
- Either a simple name or a full path, i.e. new_name or features/new_name.
- The prefix refs/ is implied for references and will be set as needed.
- In case this is a symbolic ref, there is no implied prefix
-
- :param force:
- If True, the rename will succeed even if a head with the target name
- already exists. It will be overwritten in that case
-
- :return: self
- :raise OSError: In case a file at path but a different contents already exists """
- new_path = self.to_full_path(new_path)
- if self.path == new_path:
- return self
-
- new_abs_path = join(self.repo.git_path(), new_path)
- cur_abs_path = join(self.repo.git_path(), self.path)
- if isfile(new_abs_path):
- if not force:
- # if they point to the same file, its not an error
- if open(new_abs_path,'rb').read().strip() != open(cur_abs_path,'rb').read().strip():
- raise OSError("File at path %r already exists" % new_abs_path)
- # else: we could remove ourselves and use the otherone, but
- # but clarity we just continue as usual
- # END not force handling
- os.remove(new_abs_path)
- # END handle existing target file
-
- dname = dirname(new_abs_path)
- if not isdir(dname):
- os.makedirs(dname)
- # END create directory
-
- rename(cur_abs_path, new_abs_path)
- self.path = new_path
-
- return self
-
- @classmethod
- def _iter_items(cls, repo, common_path = None):
- if common_path is None:
- common_path = cls._common_path_default
- rela_paths = set()
-
- # walk loose refs
- # Currently we do not follow links
- for root, dirs, files in os.walk(join_path_native(repo.git_path(), common_path)):
- if 'refs/' not in root: # skip non-refs subfolders
- refs_id = [ i for i,d in enumerate(dirs) if d == 'refs' ]
- if refs_id:
- dirs[0:] = ['refs']
- # END prune non-refs folders
-
- for f in files:
- abs_path = to_native_path_linux(join_path(root, f))
- rela_paths.add(abs_path.replace(to_native_path_linux(repo.git_path()) + '/', ""))
- # END for each file in root directory
- # END for each directory to walk
-
- # read packed refs
- for sha, rela_path in cls._iter_packed_refs(repo):
- if rela_path.startswith(common_path):
- rela_paths.add(rela_path)
- # END relative path matches common path
- # END packed refs reading
-
- # return paths in sorted order
- for path in sorted(rela_paths):
- try:
- yield cls.from_path(repo, path)
- except ValueError:
- continue
- # END for each sorted relative refpath
-
- @classmethod
- def iter_items(cls, repo, common_path = None):
- """Find all refs in the repository
-
- :param repo: is the repo
-
- :param common_path:
- Optional keyword argument to the path which is to be shared by all
- returned Ref objects.
- Defaults to class specific portion if None assuring that only
- refs suitable for the actual class are returned.
-
- :return:
- git.SymbolicReference[], each of them is guaranteed to be a *only* a symbolic
- ref, or a derived class which is not detached
-
- List is lexigraphically sorted
- The returned objects represent actual subclasses, such as Head or TagReference"""
- return ( r for r in cls._iter_items(repo, common_path) if r.__class__ == cls or not r.is_detached )
-
- @classmethod
- def from_path(cls, repo, path):
- """
- :param path: full .git-directory-relative path name to the Reference to instantiate
- :note: use to_full_path() if you only have a partial path of a known Reference Type
- :return:
- Instance of type Reference, Head, or Tag
- depending on the given path"""
- if not path:
- raise ValueError("Cannot create Reference from %r" % path)
-
- for ref_type in (cls.HEADCls, cls.HeadCls, cls.RemoteReferenceCls, cls.TagReferenceCls, cls.ReferenceCls, cls):
- try:
- instance = ref_type(repo, path)
- if instance.__class__ == SymbolicReference and instance.is_detached:
- raise ValueError("SymbolRef was detached, we drop it")
- return instance
- except ValueError:
- pass
- # END exception handling
- # END for each type to try
- raise ValueError("Could not find reference type suitable to handle path %r" % path)
diff --git a/gitdb/ref/tag.py b/gitdb/ref/tag.py
index 89e67ac..8b13789 100644
--- a/gitdb/ref/tag.py
+++ b/gitdb/ref/tag.py
@@ -1,50 +1 @@
-from reference import Reference
-__all__ = ["TagReference", "Tag"]
-
-
-
-class TagReference(Reference):
- """Class representing a lightweight tag reference which either points to a commit
- ,a tag object or any other object. In the latter case additional information,
- like the signature or the tag-creator, is available.
-
- This tag object will always point to a commit object, but may carray additional
- information in a tag object::
-
- tagref = TagReference.list_items(repo)[0]
- print tagref.commit.message
- if tagref.tag is not None:
- print tagref.tag.message"""
-
- __slots__ = tuple()
- _common_path_default = "refs/tags"
-
- @property
- def commit(self):
- """:return: Commit object the tag ref points to"""
- obj = self.object
- if obj.type == "commit":
- return obj
- elif obj.type == "tag":
- # it is a tag object which carries the commit as an object - we can point to anything
- return obj.object
- else:
- raise ValueError( "Tag %s points to a Blob or Tree - have never seen that before" % self )
-
- @property
- def tag(self):
- """
- :return: Tag object this tag ref points to or None in case
- we are a light weight tag"""
- obj = self.object
- if obj.type == "tag":
- return obj
- return None
-
- # make object read-only
- # It should be reasonably hard to adjust an existing tag
- object = property(Reference._get_object)
-
-# provide an alias
-Tag = TagReference
diff --git a/gitdb/test/__init__.py b/gitdb/test/__init__.py
index 760f531..a19fc87 100644
--- a/gitdb/test/__init__.py
+++ b/gitdb/test/__init__.py
@@ -3,14 +3,5 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-import gitdb.util
-#{ Initialization
-def _init_pool():
- """Assure the pool is actually threaded"""
- size = 2
- print "Setting ThreadPool to %i" % size
- gitdb.util.pool.set_size(size)
-
-#} END initialization
diff --git a/gitdb/test/db/test_base.py b/gitdb/test/db/test_base.py
index 0a381be..8b13789 100644
--- a/gitdb/test/db/test_base.py
+++ b/gitdb/test/db/test_base.py
@@ -1,18 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-from lib import *
-from gitdb.db import RefSpec
-class TestBase(TestDBBase):
-
- @with_rw_directory
- def test_basics(self, path):
- self.failUnlessRaises(ValueError, RefSpec, None, None)
- rs = RefSpec(None, "something")
- assert rs.force == False
- assert rs.delete_destination()
- assert rs.source is None
- assert rs.destination == "something"
-
diff --git a/gitdb/test/lib.py b/gitdb/test/lib.py
index 9224f5f..8b13789 100644
--- a/gitdb/test/lib.py
+++ b/gitdb/test/lib.py
@@ -1,200 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of PureGitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Utilities used in ODB testing"""
-from gitdb import OStream
-from gitdb.db.py import PureGitDB
-from gitdb.stream import (
- Sha1Writer,
- ZippedStoreShaWriter
- )
-
-from gitdb.util import zlib
-
-import sys
-import random
-from array import array
-from cStringIO import StringIO
-
-import glob
-import unittest
-import tempfile
-import shutil
-import os
-import gc
-
-
-#{ Decorators
-
-def with_rw_directory(func):
- """Create a temporary directory which can be written to, remove it if the
- test suceeds, but leave it otherwise to aid additional debugging"""
- def wrapper(self):
- path = tempfile.mktemp(prefix=func.__name__)
- os.mkdir(path)
- keep = False
- try:
- try:
- return func(self, path)
- except Exception:
- print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path)
- keep = True
- raise
- finally:
- # Need to collect here to be sure all handles have been closed. It appears
- # a windows-only issue. In fact things should be deleted, as well as
- # memory maps closed, once objects go out of scope. For some reason
- # though this is not the case here unless we collect explicitly.
- if not keep:
- gc.collect()
- shutil.rmtree(path)
- # END handle exception
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return wrapper
-
-
-def with_rw_repo(func):
- """Create a copy of our repository and put it into a writable location. It will
- be removed if the test doesn't result in an error.
- As we can currently only copy the fully working tree, tests must not rely on
- being on a certain branch or on anything really except for the default tags
- that should exist
- Wrapped function obtains a git repository """
- def wrapper(self, path):
- src_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
- assert(os.path.isdir(path))
- os.rmdir(path) # created by wrapper, but must not exist for copy operation
- shutil.copytree(src_dir, path)
- target_gitdir = os.path.join(path, '.git')
- assert os.path.isdir(target_gitdir)
- return func(self, PureGitDB(target_gitdir))
- #END wrapper
- wrapper.__name__ = func.__name__
- return with_rw_directory(wrapper)
-
-
-
-def with_packs_rw(func):
- """Function that provides a path into which the packs for testing should be
- copied. Will pass on the path to the actual function afterwards
-
- :note: needs with_rw_directory wrapped around it"""
- def wrapper(self, path):
- src_pack_glob = fixture_path('packs/*')
- copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
- return func(self, path)
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return with_rw_directory(wrapper)
-
-#} END decorators
-
-#{ Routines
-
-def repo_dir():
- """:return: path to our own repository, being our own .git directory.
- :note: doesn't work in bare repositories"""
- base = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.git')
- assert os.path.isdir(base)
- return base
-
-
-def maketemp(*args):
- """Wrapper around default tempfile.mktemp to fix an osx issue"""
- tdir = tempfile.mktemp(*args)
- if sys.platform == 'darwin':
- tdir = '/private' + tdir
- return tdir
-
-def fixture_path(relapath=''):
- """:return: absolute path into the fixture directory
- :param relapath: relative path into the fixtures directory, or ''
- to obtain the fixture directory itself"""
- return os.path.join(os.path.dirname(__file__), 'fixtures', relapath)
-
-def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
- """Copy all files found according to the given source glob into the target directory
- :param hard_link_ok: if True, hard links will be created if possible. Otherwise
- the files will be copied"""
- for src_file in glob.glob(source_glob):
- if hard_link_ok and hasattr(os, 'link'):
- target = os.path.join(target_dir, os.path.basename(src_file))
- try:
- os.link(src_file, target)
- except OSError:
- shutil.copy(src_file, target_dir)
- # END handle cross device links ( and resulting failure )
- else:
- shutil.copy(src_file, target_dir)
- # END try hard link
- # END for each file to copy
-
-
-def make_bytes(size_in_bytes, randomize=False):
- """:return: string with given size in bytes
- :param randomize: try to produce a very random stream"""
- actual_size = size_in_bytes / 4
- producer = xrange(actual_size)
- if randomize:
- producer = list(producer)
- random.shuffle(producer)
- # END randomize
- a = array('i', producer)
- return a.tostring()
-
-def make_object(type, data):
- """:return: bytes resembling an uncompressed object"""
- odata = "blob %i\0" % len(data)
- return odata + data
-
-def make_memory_file(size_in_bytes, randomize=False):
- """:return: tuple(size_of_stream, stream)
- :param randomize: try to produce a very random stream"""
- d = make_bytes(size_in_bytes, randomize)
- return len(d), StringIO(d)
-
-#} END routines
-
-#{ Stream Utilities
-
-class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
-
-
-class DeriveTest(OStream):
- def __init__(self, sha, type, size, stream, *args, **kwargs):
- self.myarg = kwargs.pop('myarg')
- self.args = args
-
- def _assert(self):
- assert self.args
- assert self.myarg
-
-#} END stream utilitiess
-
-#{ Bases
-
-class TestBase(unittest.TestCase):
- """Base class for all tests"""
- # The non-database specific tests just provides a default pure git database
- rorepo = PureGitDB(repo_dir())
-
-#} END bases
diff --git a/gitdb/test/object/test_blob.py b/gitdb/test/object/test_blob.py
index 661c050..139597f 100644
--- a/gitdb/test/object/test_blob.py
+++ b/gitdb/test/object/test_blob.py
@@ -1,23 +1,2 @@
-# test_blob.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from git.test.lib import *
-from git import *
-from gitdb.util import hex_to_bin
-class TestBlob(TestBase):
-
- def test_mime_type_should_return_mime_type_for_known_types(self):
- blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'foo.png'})
- assert_equal("image/png", blob.mime_type)
-
- def test_mime_type_should_return_text_plain_for_unknown_types(self):
- blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA,'path': 'something'})
- assert_equal("text/plain", blob.mime_type)
-
- def test_nodict(self):
- self.failUnlessRaises(AttributeError, setattr, self.rorepo.tree()['AUTHORS'], 'someattr', 2)
-
diff --git a/gitdb/test/object/test_commit.py b/gitdb/test/object/test_commit.py
index 4a8d8b8..8b13789 100644
--- a/gitdb/test/object/test_commit.py
+++ b/gitdb/test/object/test_commit.py
@@ -1,275 +1 @@
-# -*- coding: utf-8 -*-
-# test_commit.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from git.test.lib import *
-from git import *
-from gitdb import IStream
-from gitdb.util import hex_to_bin
-
-from cStringIO import StringIO
-import time
-import sys
-
-
-def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False):
- """traverse all commits in the history of commit identified by commit_id and check
- if the serialization works.
- :param print_performance_info: if True, we will show how fast we are"""
- ns = 0 # num serializations
- nds = 0 # num deserializations
-
- st = time.time()
- for cm in rwrepo.commit(commit_id).traverse():
- nds += 1
-
- # assert that we deserialize commits correctly, hence we get the same
- # sha on serialization
- stream = StringIO()
- cm._serialize(stream)
- ns += 1
- streamlen = stream.tell()
- stream.seek(0)
-
- istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream))
- assert istream.hexsha == cm.hexsha
-
- nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree,
- cm.author, cm.authored_date, cm.author_tz_offset,
- cm.committer, cm.committed_date, cm.committer_tz_offset,
- cm.message, cm.parents, cm.encoding)
-
- assert nc.parents == cm.parents
- stream = StringIO()
- nc._serialize(stream)
- ns += 1
- streamlen = stream.tell()
- stream.seek(0)
-
- # reuse istream
- istream.size = streamlen
- istream.stream = stream
- istream.binsha = None
- nc.binsha = rwrepo.odb.store(istream).binsha
-
- # if it worked, we have exactly the same contents !
- assert nc.hexsha == cm.hexsha
- # END check commits
- elapsed = time.time() - st
-
- if print_performance_info:
- print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns/elapsed, nds/elapsed)
- # END handle performance info
-
-
-class TestCommit(TestBase):
-
- def test_bake(self):
-
- commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae')
- # commits have no dict
- self.failUnlessRaises(AttributeError, setattr, commit, 'someattr', 1)
- commit.author # bake
-
- assert_equal("Sebastian Thiel", commit.author.name)
- assert_equal("byronimo@gmail.com", commit.author.email)
- assert commit.author == commit.committer
- assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int)
- assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int)
- assert commit.message == "Added missing information to docstrings of commit and stats module\n"
-
-
- def test_stats(self):
- commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781')
- stats = commit.stats
-
- def check_entries(d):
- assert isinstance(d, dict)
- for key in ("insertions", "deletions", "lines"):
- assert key in d
- # END assertion helper
- assert stats.files
- assert stats.total
-
- check_entries(stats.total)
- assert "files" in stats.total
-
- for filepath, d in stats.files.items():
- check_entries(d)
- # END for each stated file
-
- # assure data is parsed properly
- michael = Actor._from_string("Michael Trier <mtrier@gmail.com>")
- assert commit.author == michael
- assert commit.committer == michael
- assert commit.authored_date == 1210193388
- assert commit.committed_date == 1210193388
- assert commit.author_tz_offset == 14400, commit.author_tz_offset
- assert commit.committer_tz_offset == 14400, commit.committer_tz_offset
- assert commit.message == "initial project\n"
-
- def test_unicode_actor(self):
- # assure we can parse unicode actors correctly
- name = "Üäöß ÄußÉ".decode("utf-8")
- assert len(name) == 9
- special = Actor._from_string(u"%s <something@this.com>" % name)
- assert special.name == name
- assert isinstance(special.name, unicode)
-
- def test_traversal(self):
- start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff")
- first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781")
- p0 = start.parents[0]
- p1 = start.parents[1]
- p00 = p0.parents[0]
- p10 = p1.parents[0]
-
- # basic branch first, depth first
- dfirst = start.traverse(branch_first=False)
- bfirst = start.traverse(branch_first=True)
- assert dfirst.next() == p0
- assert dfirst.next() == p00
-
- assert bfirst.next() == p0
- assert bfirst.next() == p1
- assert bfirst.next() == p00
- assert bfirst.next() == p10
-
- # at some point, both iterations should stop
- assert list(bfirst)[-1] == first
- stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True)
- l = list(stoptraverse)
- assert len(l[0]) == 2
-
- # ignore self
- assert start.traverse(ignore_self=False).next() == start
-
- # depth
- assert len(list(start.traverse(ignore_self=False, depth=0))) == 1
-
- # prune
- assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1
-
- # predicate
- assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1
-
- # traversal should stop when the beginning is reached
- self.failUnlessRaises(StopIteration, first.traverse().next)
-
- # parents of the first commit should be empty ( as the only parent has a null
- # sha )
- assert len(first.parents) == 0
-
- def test_iteration(self):
- # we can iterate commits
- all_commits = Commit.list_items(self.rorepo, self.rorepo.head)
- assert all_commits
- assert all_commits == list(self.rorepo.iter_commits())
-
- # this includes merge commits
- mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d')
- assert mcomit in all_commits
-
- # we can limit the result to paths
- ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES'))
- assert ltd_commits and len(ltd_commits) < len(all_commits)
-
- # show commits of multiple paths, resulting in a union of commits
- less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS')))
- assert len(ltd_commits) < len(less_ltd_commits)
-
- def test_iter_items(self):
- # pretty not allowed
- self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw")
-
- def test_rev_list_bisect_all(self):
- """
- 'git rev-list --bisect-all' returns additional information
- in the commit header. This test ensures that we properly parse it.
- """
- revs = self.rorepo.git.rev_list('933d23bf95a5bd1624fbcdf328d904e1fa173474',
- first_parent=True,
- bisect_all=True)
-
- commits = Commit._iter_from_process_or_stream(self.rorepo, StringProcessAdapter(revs))
- expected_ids = (
- '7156cece3c49544abb6bf7a0c218eb36646fad6d',
- '1f66cfbbce58b4b552b041707a12d437cc5f400a',
- '33ebe7acec14b25c5f84f35a664803fcab2f7781',
- '933d23bf95a5bd1624fbcdf328d904e1fa173474'
- )
- for sha1, commit in zip(expected_ids, commits):
- assert_equal(sha1, commit.hexsha)
-
- def test_count(self):
- assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143
-
- def test_list(self):
- assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit)
-
- def test_str(self):
- commit = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- assert_equal(Commit.NULL_HEX_SHA, str(commit))
-
- def test_repr(self):
- commit = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- assert_equal('<git.Commit "%s">' % Commit.NULL_HEX_SHA, repr(commit))
-
- def test_equality(self):
- commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- commit3 = Commit(self.rorepo, "\1"*20)
- assert_equal(commit1, commit2)
- assert_not_equal(commit2, commit3)
-
- def test_iter_parents(self):
- # should return all but ourselves, even if skip is defined
- c = self.rorepo.commit('0.1.5')
- for skip in (0, 1):
- piter = c.iter_parents(skip=skip)
- first_parent = piter.next()
- assert first_parent != c
- assert first_parent == c.parents[0]
- # END for each
-
- def test_base(self):
- name_rev = self.rorepo.head.commit.name_rev
- assert isinstance(name_rev, basestring)
-
- @with_rw_repo('HEAD', bare=True)
- def test_serialization(self, rwrepo):
- # create all commits of our repo
- assert_commit_serialization(rwrepo, '0.1.6')
-
- def test_serialization_unicode_support(self):
- assert Commit.default_encoding.lower() == 'utf-8'
-
- # create a commit with unicode in the message, and the author's name
- # Verify its serialization and deserialization
- cmt = self.rorepo.commit('0.1.6')
- assert isinstance(cmt.message, unicode) # it automatically decodes it as such
- assert isinstance(cmt.author.name, unicode) # same here
-
- cmt.message = "üäêèß".decode("utf-8")
- assert len(cmt.message) == 5
-
- cmt.author.name = "äüß".decode("utf-8")
- assert len(cmt.author.name) == 3
-
- cstream = StringIO()
- cmt._serialize(cstream)
- cstream.seek(0)
- assert len(cstream.getvalue())
-
- ncmt = Commit(self.rorepo, cmt.binsha)
- ncmt._deserialize(cstream)
-
- assert cmt.author.name == ncmt.author.name
- assert cmt.message == ncmt.message
- # actually, it can't be printed in a shell as repr wants to have ascii only
- # it appears
- cmt.author.__repr__()
-
diff --git a/gitdb/test/object/test_tree.py b/gitdb/test/object/test_tree.py
index ec10e96..8b13789 100644
--- a/gitdb/test/object/test_tree.py
+++ b/gitdb/test/object/test_tree.py
@@ -1,144 +1 @@
-# test_tree.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-import os
-from git.test.lib import *
-from git import *
-from git.objects.fun import (
- traverse_tree_recursive,
- traverse_trees_recursive
- )
-from cStringIO import StringIO
-
-class TestTree(TestBase):
-
- def test_serializable(self):
- # tree at the given commit contains a submodule as well
- roottree = self.rorepo.tree('6c1faef799095f3990e9970bc2cb10aa0221cf9c')
- for item in roottree.traverse(ignore_self=False):
- if item.type != Tree.type:
- continue
- # END skip non-trees
- tree = item
- # trees have no dict
- self.failUnlessRaises(AttributeError, setattr, tree, 'someattr', 1)
-
- orig_data = tree.data_stream.read()
- orig_cache = tree._cache
-
- stream = StringIO()
- tree._serialize(stream)
- assert stream.getvalue() == orig_data
-
- stream.seek(0)
- testtree = Tree(self.rorepo, Tree.NULL_BIN_SHA, 0, '')
- testtree._deserialize(stream)
- assert testtree._cache == orig_cache
-
-
- # TEST CACHE MUTATOR
- mod = testtree.cache
- self.failUnlessRaises(ValueError, mod.add, "invalid sha", 0, "name")
- self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, 0, "invalid mode")
- self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, tree.mode, "invalid/name")
-
- # add new item
- name = "fake_dir"
- mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
- assert name in testtree
-
- # its available in the tree immediately
- assert isinstance(testtree[name], Tree)
-
- # adding it again will not cause multiple of them to be presents
- cur_count = len(testtree)
- mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
- assert len(testtree) == cur_count
-
- # fails with a different sha - name exists
- hexsha = "1"*40
- self.failUnlessRaises(ValueError, mod.add, hexsha, tree.mode, name)
-
- # force it - replace existing one
- mod.add(hexsha, tree.mode, name, force=True)
- assert testtree[name].hexsha == hexsha
- assert len(testtree) == cur_count
-
- # unchecked addition always works, even with invalid items
- invalid_name = "hi/there"
- mod.add_unchecked(hexsha, 0, invalid_name)
- assert len(testtree) == cur_count + 1
-
- del(mod[invalid_name])
- assert len(testtree) == cur_count
- # del again, its fine
- del(mod[invalid_name])
-
- # have added one item, we are done
- mod.set_done()
- mod.set_done() # multiple times are okay
-
- # serialize, its different now
- stream = StringIO()
- testtree._serialize(stream)
- stream.seek(0)
- assert stream.getvalue() != orig_data
-
- # replaces cache, but we make sure of it
- del(testtree._cache)
- testtree._deserialize(stream)
- assert name in testtree
- assert invalid_name not in testtree
- # END for each item in tree
-
- def test_traverse(self):
- root = self.rorepo.tree('0.1.6')
- num_recursive = 0
- all_items = list()
- for obj in root.traverse():
- if "/" in obj.path:
- num_recursive += 1
-
- assert isinstance(obj, (Blob, Tree))
- all_items.append(obj)
- # END for each object
- assert all_items == root.list_traverse()
-
- # limit recursion level to 0 - should be same as default iteration
- assert all_items
- assert 'CHANGES' in root
- assert len(list(root)) == len(list(root.traverse(depth=1)))
-
- # only choose trees
- trees_only = lambda i,d: i.type == "tree"
- trees = list(root.traverse(predicate = trees_only))
- assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) ))
-
- # test prune
- lib_folder = lambda t,d: t.path == "lib"
- pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder))
- assert len(pruned_trees) < len(trees)
-
- # trees and blobs
- assert len(set(trees)|set(root.trees)) == len(trees)
- assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs )
- subitem = trees[0][0]
- assert "/" in subitem.path
- assert subitem.name == os.path.basename(subitem.path)
-
- # assure that at some point the traversed paths have a slash in them
- found_slash = False
- for item in root.traverse():
- assert os.path.isabs(item.abspath)
- if '/' in item.path:
- found_slash = True
- # END check for slash
-
- # slashes in paths are supported as well
- assert root[item.path] == item == root/item.path
- # END for each item
- assert found_slash
-
diff --git a/gitdb/test/performance/lib.py b/gitdb/test/performance/lib.py
index 761113d..cb6303f 100644
--- a/gitdb/test/performance/lib.py
+++ b/gitdb/test/performance/lib.py
@@ -3,52 +3,4 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Contains library functions"""
-import os
-from gitdb.test.lib import *
-import shutil
-import tempfile
-
-#{ Invvariants
-k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE"
-#} END invariants
-
-
-#{ Utilities
-def resolve_or_fail(env_var):
- """:return: resolved environment variable or raise EnvironmentError"""
- try:
- return os.environ[env_var]
- except KeyError:
- raise EnvironmentError("Please set the %r envrionment variable and retry" % env_var)
- # END exception handling
-
-#} END utilities
-
-
-#{ Base Classes
-
-class TestBigRepoR(TestBase):
- """TestCase providing access to readonly 'big' repositories using the following
- member variables:
-
- * gitrepopath
-
- * read-only base path of the git source repository, i.e. .../git/.git"""
-
- #{ Invariants
- head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca'
- head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5'
- #} END invariants
-
- @classmethod
- def setUpAll(cls):
- try:
- super(TestBigRepoR, cls).setUpAll()
- except AttributeError:
- pass
- cls.gitrepopath = resolve_or_fail(k_env_git_repo)
- assert cls.gitrepopath.endswith('.git')
-
-
-#} END base classes
diff --git a/gitdb/test/performance/test_pack.py b/gitdb/test/performance/test_pack.py
index da952b1..8b13789 100644
--- a/gitdb/test/performance/test_pack.py
+++ b/gitdb/test/performance/test_pack.py
@@ -1,90 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Performance tests for object store"""
-from lib import (
- TestBigRepoR
- )
-from gitdb.exc import UnsupportedOperation
-from gitdb.db.pack import PackedDB
-
-import sys
-import os
-from time import time
-import random
-
-class TestPackedDBPerformance(TestBigRepoR):
-
- def _test_pack_random_access(self):
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
-
- # sha lookup
- st = time()
- sha_list = list(pdb.sha_iter())
- elapsed = time() - st
- ns = len(sha_list)
- print >> sys.stderr, "PDB: looked up %i shas by index in %f s ( %f shas/s )" % (ns, elapsed, ns / elapsed)
-
- # sha lookup: best-case and worst case access
- pdb_pack_info = pdb._pack_info
- # END shuffle shas
- st = time()
- for sha in sha_list:
- pdb_pack_info(sha)
- # END for each sha to look up
- elapsed = time() - st
-
- # discard cache
- del(pdb._entities)
- pdb.entities()
- print >> sys.stderr, "PDB: looked up %i sha in %i packs in %f s ( %f shas/s )" % (ns, len(pdb.entities()), elapsed, ns / elapsed)
- # END for each random mode
-
- # query info and streams only
- max_items = 10000 # can wait longer when testing memory
- for pdb_fun in (pdb.info, pdb.stream):
- st = time()
- for sha in sha_list[:max_items]:
- pdb_fun(sha)
- elapsed = time() - st
- print >> sys.stderr, "PDB: Obtained %i object %s by sha in %f s ( %f items/s )" % (max_items, pdb_fun.__name__.upper(), elapsed, max_items / elapsed)
- # END for each function
-
- # retrieve stream and read all
- max_items = 5000
- pdb_stream = pdb.stream
- total_size = 0
- st = time()
- for sha in sha_list[:max_items]:
- stream = pdb_stream(sha)
- stream.read()
- total_size += stream.size
- elapsed = time() - st
- total_kib = total_size / 1000
- print >> sys.stderr, "PDB: Obtained %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (max_items, total_kib, total_kib/elapsed , elapsed, max_items / elapsed)
-
- def test_correctness(self):
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
- # disabled for now as it used to work perfectly, checking big repositories takes a long time
- print >> sys.stderr, "Endurance run: verify streaming of objects (crc and sha)"
- for crc in range(2):
- count = 0
- st = time()
- for entity in pdb.entities():
- pack_verify = entity.is_valid_stream
- sha_by_index = entity.index().sha
- for index in xrange(entity.index().size()):
- try:
- assert pack_verify(sha_by_index(index), use_crc=crc)
- count += 1
- except UnsupportedOperation:
- pass
- # END ignore old indices
- # END for each index
- # END for each entity
- elapsed = time() - st
- print >> sys.stderr, "PDB: verified %i objects (crc=%i) in %f s ( %f objects/s )" % (count, crc, elapsed, count / elapsed)
- # END for each verify mode
-
diff --git a/gitdb/test/performance/test_stream.py b/gitdb/test/performance/test_stream.py
index 0e47484..d64346e 100644
--- a/gitdb/test/performance/test_stream.py
+++ b/gitdb/test/performance/test_stream.py
@@ -3,193 +3,4 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Performance data streaming performance"""
-from lib import TestBigRepoR
-from gitdb.db.py import *
-from gitdb.base import *
-from gitdb.stream import *
-from gitdb.util import (
- pool,
- bin_to_hex
- )
-from gitdb.typ import str_blob_type
-from gitdb.fun import chunk_size
-from async import (
- IteratorReader,
- ChannelThreadTask,
- )
-
-from cStringIO import StringIO
-from time import time
-import os
-import sys
-import stat
-import subprocess
-
-
-from lib import (
- TestBigRepoR,
- make_memory_file,
- with_rw_directory
- )
-
-
-#{ Utilities
-def read_chunked_stream(stream):
- total = 0
- while True:
- chunk = stream.read(chunk_size)
- total += len(chunk)
- if len(chunk) < chunk_size:
- break
- # END read stream loop
- assert total == stream.size
- return stream
-
-
-class TestStreamReader(ChannelThreadTask):
- """Expects input streams and reads them in chunks. It will read one at a time,
- requireing a queue chunk of size 1"""
- def __init__(self, *args):
- super(TestStreamReader, self).__init__(*args)
- self.fun = read_chunked_stream
- self.max_chunksize = 1
-
-
-#} END utilities
-
-class TestObjDBPerformance(TestBigRepoR):
-
- large_data_size_bytes = 1000*1000*50 # some MiB should do it
- moderate_data_size_bytes = 1000*1000*1 # just 1 MiB
-
- @with_rw_directory
- def test_large_data_streaming(self, path):
- ldb = PureLooseObjectODB(path)
- string_ios = list() # list of streams we previously created
-
- # serial mode
- for randomize in range(2):
- desc = (randomize and 'random ') or ''
- print >> sys.stderr, "Creating %s data ..." % desc
- st = time()
- size, stream = make_memory_file(self.large_data_size_bytes, randomize)
- elapsed = time() - st
- print >> sys.stderr, "Done (in %f s)" % elapsed
- string_ios.append(stream)
-
- # writing - due to the compression it will seem faster than it is
- st = time()
- sha = ldb.store(IStream('blob', size, stream)).binsha
- elapsed_add = time() - st
- assert ldb.has_object(sha)
- db_file = ldb.readable_db_object_path(bin_to_hex(sha))
- fsize_kib = os.path.getsize(db_file) / 1000
-
-
- size_kib = size / 1000
- print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
-
- # reading all at once
- st = time()
- ostream = ldb.stream(sha)
- shadata = ostream.read()
- elapsed_readall = time() - st
-
- stream.seek(0)
- assert shadata == stream.getvalue()
- print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)
-
-
- # reading in chunks of 1 MiB
- cs = 512*1000
- chunks = list()
- st = time()
- ostream = ldb.stream(sha)
- while True:
- data = ostream.read(cs)
- chunks.append(data)
- if len(data) < cs:
- break
- # END read in chunks
- elapsed_readchunks = time() - st
-
- stream.seek(0)
- assert ''.join(chunks) == stream.getvalue()
-
- cs_kib = cs / 1000
- print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
-
- # del db file so we keep something to do
- os.remove(db_file)
- # END for each randomization factor
-
-
- # multi-threaded mode
- # want two, should be supported by most of todays cpus
- pool.set_size(2)
- total_kib = 0
- nsios = len(string_ios)
- for stream in string_ios:
- stream.seek(0)
- total_kib += len(stream.getvalue()) / 1000
- # END rewind
-
- def istream_iter():
- for stream in string_ios:
- stream.seek(0)
- yield IStream(str_blob_type, len(stream.getvalue()), stream)
- # END for each stream
- # END util
-
- # write multiple objects at once, involving concurrent compression
- reader = IteratorReader(istream_iter())
- istream_reader = ldb.store_async(reader)
- istream_reader.task().max_chunksize = 1
-
- st = time()
- istreams = istream_reader.read(nsios)
- assert len(istreams) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Compressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
-
- # decompress multiple at once, by reading them
- # chunk size is not important as the stream will not really be decompressed
-
- # until its read
- istream_reader = IteratorReader(iter([ i.binsha for i in istreams ]))
- ostream_reader = ldb.stream_async(istream_reader)
-
- chunk_task = TestStreamReader(ostream_reader, "chunker", None)
- output_reader = pool.add_task(chunk_task)
- output_reader.task().max_chunksize = 1
-
- st = time()
- assert len(output_reader.read(nsios)) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Decompressed %i KiB of data in loose odb in %f s ( %f Read KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
-
- # store the files, and read them back. For the reading, we use a task
- # as well which is chunked into one item per task. Reading all will
- # very quickly result in two threads handling two bytestreams of
- # chained compression/decompression streams
- reader = IteratorReader(istream_iter())
- istream_reader = ldb.store_async(reader)
- istream_reader.task().max_chunksize = 1
-
- istream_to_sha = lambda items: [ i.binsha for i in items ]
- istream_reader.set_post_cb(istream_to_sha)
-
- ostream_reader = ldb.stream_async(istream_reader)
-
- chunk_task = TestStreamReader(ostream_reader, "chunker", None)
- output_reader = pool.add_task(chunk_task)
- output_reader.max_chunksize = 1
-
- st = time()
- assert len(output_reader.read(nsios)) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Compressed and decompressed and read %i KiB of data in loose odb in %f s ( %f Combined KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
diff --git a/gitdb/test/test_base.py b/gitdb/test/test_base.py
index 1b20faf..fbe0135 100644
--- a/gitdb/test/test_base.py
+++ b/gitdb/test/test_base.py
@@ -3,96 +3,5 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
-from lib import (
- TestBase,
- DummyStream,
- DeriveTest,
- )
-from gitdb import *
-from gitdb.util import (
- NULL_BIN_SHA
- )
-from gitdb.typ import (
- str_blob_type
- )
-
-
-class TestBaseTypes(TestBase):
-
- def test_streams(self):
- # test info
- sha = NULL_BIN_SHA
- s = 20
- blob_id = 3
-
- info = OInfo(sha, str_blob_type, s)
- assert info.binsha == sha
- assert info.type == str_blob_type
- assert info.type_id == blob_id
- assert info.size == s
-
- # test pack info
- # provides type_id
- pinfo = OPackInfo(0, blob_id, s)
- assert pinfo.type == str_blob_type
- assert pinfo.type_id == blob_id
- assert pinfo.pack_offset == 0
-
- dpinfo = ODeltaPackInfo(0, blob_id, s, sha)
- assert dpinfo.type == str_blob_type
- assert dpinfo.type_id == blob_id
- assert dpinfo.delta_info == sha
- assert dpinfo.pack_offset == 0
-
-
- # test ostream
- stream = DummyStream()
- ostream = OStream(*(info + (stream, )))
- assert ostream.stream is stream
- ostream.read(15)
- stream._assert()
- assert stream.bytes == 15
- ostream.read(20)
- assert stream.bytes == 20
-
- # test packstream
- postream = OPackStream(*(pinfo + (stream, )))
- assert postream.stream is stream
- postream.read(10)
- stream._assert()
- assert stream.bytes == 10
-
- # test deltapackstream
- dpostream = ODeltaPackStream(*(dpinfo + (stream, )))
- dpostream.stream is stream
- dpostream.read(5)
- stream._assert()
- assert stream.bytes == 5
-
- # derive with own args
- DeriveTest(sha, str_blob_type, s, stream, 'mine',myarg = 3)._assert()
-
- # test istream
- istream = IStream(str_blob_type, s, stream)
- assert istream.binsha == None
- istream.binsha = sha
- assert istream.binsha == sha
-
- assert len(istream.binsha) == 20
- assert len(istream.hexsha) == 40
-
- assert istream.size == s
- istream.size = s * 2
- istream.size == s * 2
- assert istream.type == str_blob_type
- istream.type = "something"
- assert istream.type == "something"
- assert istream.stream is stream
- istream.stream = None
- assert istream.stream is None
-
- assert istream.error is None
- istream.error = Exception()
- assert isinstance(istream.error, Exception)
diff --git a/gitdb/test/test_config.py b/gitdb/test/test_config.py
index 205f3c2..8b13789 100644
--- a/gitdb/test/test_config.py
+++ b/gitdb/test/test_config.py
@@ -1,102 +1 @@
-# test_config.py
-# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
-#
-# This module is part of GitPython and is released under
-# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from gitdb.test.lib import *
-from gitdb.config import *
-import StringIO
-from copy import copy
-from ConfigParser import NoSectionError
-
-class TestBase(TestBase):
-
- def _to_memcache(self, file_path):
- fp = open(file_path, "r")
- sio = StringIO.StringIO(fp.read())
- sio.name = file_path
- return sio
-
- def _parsers_equal_or_raise(self, lhs, rhs):
- pass
-
- def test_read_write(self):
- # writer must create the exact same file as the one read before
- for filename in ("git_config", "git_config_global"):
- file_obj = self._to_memcache(fixture_path(filename))
- file_obj_orig = copy(file_obj)
- w_config = GitConfigParser(file_obj, read_only = False)
- w_config.read() # enforce reading
- assert w_config._sections
- w_config.write() # enforce writing
- assert file_obj.getvalue() == file_obj_orig.getvalue()
-
- # creating an additional config writer must fail due to exclusive access
- self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False)
-
- # should still have a lock and be able to make changes
- assert w_config._lock._has_lock()
-
- # changes should be written right away
- sname = "my_section"
- oname = "mykey"
- val = "myvalue"
- w_config.add_section(sname)
- assert w_config.has_section(sname)
- w_config.set(sname, oname, val)
- assert w_config.has_option(sname,oname)
- assert w_config.get(sname, oname) == val
-
- sname_new = "new_section"
- oname_new = "new_key"
- ival = 10
- w_config.set_value(sname_new, oname_new, ival)
- assert w_config.get_value(sname_new, oname_new) == ival
-
- file_obj.seek(0)
- r_config = GitConfigParser(file_obj, read_only=True)
- assert r_config.has_section(sname)
- assert r_config.has_option(sname, oname)
- assert r_config.get(sname, oname) == val
-
- # END for each filename
-
- def test_base(self):
- path_repo = fixture_path("git_config")
- path_global = fixture_path("git_config_global")
- r_config = GitConfigParser([path_repo, path_global], read_only=True)
- assert r_config.read_only
- num_sections = 0
- num_options = 0
-
- # test reader methods
- assert r_config._is_initialized == False
- for section in r_config.sections():
- num_sections += 1
- for option in r_config.options(section):
- num_options += 1
- val = r_config.get(section, option)
- val_typed = r_config.get_value(section, option)
- assert isinstance(val_typed, (bool, long, float, basestring))
- assert val
- assert "\n" not in option
- assert "\n" not in val
-
- # writing must fail
- self.failUnlessRaises(IOError, r_config.set, section, option, None)
- self.failUnlessRaises(IOError, r_config.remove_option, section, option )
- # END for each option
- self.failUnlessRaises(IOError, r_config.remove_section, section)
- # END for each section
- assert num_sections and num_options
- assert r_config._is_initialized == True
-
- # get value which doesnt exist, with default
- default = "my default value"
- assert r_config.get_value("doesnt", "exist", default) == default
-
- # it raises if there is no default though
- self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist")
-
-
diff --git a/gitdb/test/test_example.py b/gitdb/test/test_example.py
index c2e7840..8b13789 100644
--- a/gitdb/test/test_example.py
+++ b/gitdb/test/test_example.py
@@ -1,64 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Module with examples from the tutorial section of the docs"""
-from lib import *
-from gitdb import IStream
-from gitdb.db.py import PureLooseObjectODB
-from gitdb.util import pool
-
-from cStringIO import StringIO
-from async import IteratorReader
-
-class TestExamples(TestBase):
-
- def test_base(self):
- ldb = PureLooseObjectODB(fixture_path("../../../.git/objects"))
-
- for sha1 in ldb.sha_iter():
- oinfo = ldb.info(sha1)
- ostream = ldb.stream(sha1)
- assert oinfo[:3] == ostream[:3]
-
- assert len(ostream.read()) == ostream.size
- assert ldb.has_object(oinfo.binsha)
- # END for each sha in database
- # assure we close all files
- try:
- del(ostream)
- del(oinfo)
- except UnboundLocalError:
- pass
- # END ignore exception if there are no loose objects
-
- data = "my data"
- istream = IStream("blob", len(data), StringIO(data))
-
- # the object does not yet have a sha
- assert istream.binsha is None
- ldb.store(istream)
- # now the sha is set
- assert len(istream.binsha) == 20
- assert ldb.has_object(istream.binsha)
-
-
- # async operation
- # Create a reader from an iterator
- reader = IteratorReader(ldb.sha_iter())
-
- # get reader for object streams
- info_reader = ldb.stream_async(reader)
-
- # read one
- info = info_reader.read(1)[0]
-
- # read all the rest until depletion
- ostreams = info_reader.read()
-
- # set the pool to use two threads
- pool.set_size(2)
-
- # synchronize the mode of operation
- pool.set_size(0)
diff --git a/gitdb/test/test_pack.py b/gitdb/test/test_pack.py
index 4a7f1ca..8b13789 100644
--- a/gitdb/test/test_pack.py
+++ b/gitdb/test/test_pack.py
@@ -1,247 +1 @@
-# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
-#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-"""Test everything about packs reading and writing"""
-from lib import (
- TestBase,
- with_rw_directory,
- with_packs_rw,
- fixture_path
- )
-from gitdb.stream import DeltaApplyReader
-from gitdb.pack import (
- PackEntity,
- PackIndexFile,
- PackFile
- )
-
-from gitdb.base import (
- OInfo,
- OStream,
- )
-
-from gitdb.fun import delta_types
-from gitdb.exc import UnsupportedOperation
-from gitdb.util import to_bin_sha
-from itertools import izip, chain
-from nose import SkipTest
-
-import os
-import sys
-import tempfile
-
-
-#{ Utilities
-def bin_sha_from_filename(filename):
- return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:])
-#} END utilities
-
-class TestPack(TestBase):
-
- packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67)
- packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30)
- packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42)
- packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2])
- packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2])
- packfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
-
-
- def _assert_index_file(self, index, version, size):
- assert index.packfile_checksum() != index.indexfile_checksum()
- assert len(index.packfile_checksum()) == 20
- assert len(index.indexfile_checksum()) == 20
- assert index.version() == version
- assert index.size() == size
- assert len(index.offsets()) == size
-
- # get all data of all objects
- for oidx in xrange(index.size()):
- sha = index.sha(oidx)
- assert oidx == index.sha_to_index(sha)
-
- entry = index.entry(oidx)
- assert len(entry) == 3
-
- assert entry[0] == index.offset(oidx)
- assert entry[1] == sha
- assert entry[2] == index.crc(oidx)
-
- # verify partial sha
- for l in (4,8,11,17,20):
- assert index.partial_sha_to_index(sha[:l], l*2) == oidx
-
- # END for each object index in indexfile
- self.failUnlessRaises(ValueError, index.partial_sha_to_index, "\0", 2)
-
-
- def _assert_pack_file(self, pack, version, size):
- assert pack.version() == 2
- assert pack.size() == size
- assert len(pack.checksum()) == 20
-
- num_obj = 0
- for obj in pack.stream_iter():
- num_obj += 1
- info = pack.info(obj.pack_offset)
- stream = pack.stream(obj.pack_offset)
-
- assert info.pack_offset == stream.pack_offset
- assert info.type_id == stream.type_id
- assert hasattr(stream, 'read')
-
- # it should be possible to read from both streams
- assert obj.read() == stream.read()
-
- streams = pack.collect_streams(obj.pack_offset)
- assert streams
-
- # read the stream
- try:
- dstream = DeltaApplyReader.new(streams)
- except ValueError:
- # ignore these, old git versions use only ref deltas,
- # which we havent resolved ( as we are without an index )
- # Also ignore non-delta streams
- continue
- # END get deltastream
-
- # read all
- data = dstream.read()
- assert len(data) == dstream.size
-
- # test seek
- dstream.seek(0)
- assert dstream.read() == data
-
-
- # read chunks
- # NOTE: the current implementation is safe, it basically transfers
- # all calls to the underlying memory map
-
- # END for each object
- assert num_obj == size
-
-
- def test_pack_index(self):
- # check version 1 and 2
- for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
- index = PackIndexFile(indexfile)
- self._assert_index_file(index, version, size)
- # END run tests
-
- def test_pack(self):
- # there is this special version 3, but apparently its like 2 ...
- for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2):
- pack = PackFile(packfile)
- self._assert_pack_file(pack, version, size)
- # END for each pack to test
-
- @with_rw_directory
- def test_pack_entity(self, rw_dir):
- pack_objs = list()
- for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
- (self.packfile_v2_2, self.packindexfile_v2),
- (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
- packfile, version, size = packinfo
- indexfile, version, size = indexinfo
- entity = PackEntity(packfile)
- assert entity.pack().path() == packfile
- assert entity.index().path() == indexfile
- pack_objs.extend(entity.stream_iter())
-
- count = 0
- for info, stream in izip(entity.info_iter(), entity.stream_iter()):
- count += 1
- assert info.binsha == stream.binsha
- assert len(info.binsha) == 20
- assert info.type_id == stream.type_id
- assert info.size == stream.size
-
- # we return fully resolved items, which is implied by the sha centric access
- assert not info.type_id in delta_types
-
- # try all calls
- assert len(entity.collect_streams(info.binsha))
- oinfo = entity.info(info.binsha)
- assert isinstance(oinfo, OInfo)
- assert oinfo.binsha is not None
- ostream = entity.stream(info.binsha)
- assert isinstance(ostream, OStream)
- assert ostream.binsha is not None
-
- # verify the stream
- try:
- assert entity.is_valid_stream(info.binsha, use_crc=True)
- except UnsupportedOperation:
- pass
- # END ignore version issues
- assert entity.is_valid_stream(info.binsha, use_crc=False)
- # END for each info, stream tuple
- assert count == size
-
- # END for each entity
-
- # pack writing - write all packs into one
- # index path can be None
- pack_path = tempfile.mktemp('', "pack", rw_dir)
- index_path = tempfile.mktemp('', 'index', rw_dir)
- iteration = 0
- def rewind_streams():
- for obj in pack_objs:
- obj.stream.seek(0)
- #END utility
- for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)):
- pfile = open(ppath, 'wb')
- iwrite = None
- if ipath:
- ifile = open(ipath, 'wb')
- iwrite = ifile.write
- #END handle ip
-
- # make sure we rewind the streams ... we work on the same objects over and over again
- if iteration > 0:
- rewind_streams()
- #END rewind streams
- iteration += 1
-
- pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj)
- pfile.close()
- assert os.path.getsize(ppath) > 100
-
- # verify pack
- pf = PackFile(ppath)
- assert pf.size() == len(pack_objs)
- assert pf.version() == PackFile.pack_version_default
- assert pf.checksum() == pack_sha
-
- # verify index
- if ipath is not None:
- ifile.close()
- assert os.path.getsize(ipath) > 100
- idx = PackIndexFile(ipath)
- assert idx.version() == PackIndexFile.index_version_default
- assert idx.packfile_checksum() == pack_sha
- assert idx.indexfile_checksum() == index_sha
- assert idx.size() == len(pack_objs)
- #END verify files exist
- #END for each packpath, indexpath pair
-
- # verify the packs throughly
- rewind_streams()
- entity = PackEntity.create(pack_objs, rw_dir)
- count = 0
- for info in entity.info_iter():
- count += 1
- for use_crc in range(2):
- assert entity.is_valid_stream(info.binsha, use_crc)
- # END for each crc mode
- #END for each info
- assert count == len(pack_objs)
-
-
- def test_pack_64(self):
- # TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets
- # of course without really needing such a huge pack
- raise SkipTest()
diff --git a/gitdb/test/test_refs.py b/gitdb/test/test_refs.py
index 2c571b7..649542f 100644
--- a/gitdb/test/test_refs.py
+++ b/gitdb/test/test_refs.py
@@ -158,26 +158,25 @@ class TestRefs(TestBase):
cur_head = HEAD(rw_repo)
old_head_commit = cur_head.commit
new_head_commit = cur_head.ref.commit.parents[0]
- if False: #TODO get reset checking back into the game
- cur_head.reset(new_head_commit, index=True) # index only
- assert cur_head.reference.commit == new_head_commit
-
- self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
- new_head_commit = new_head_commit.parents[0]
- cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
- assert cur_head.reference.commit == new_head_commit
-
- # paths - make sure we have something to do
- rw_repo.index.reset(old_head_commit.parents[0])
- cur_head.reset(cur_head, paths = "test")
- cur_head.reset(new_head_commit, paths = "lib")
- # hard resets with paths don't work, its all or nothing
- self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib")
-
- # we can do a mixed reset, and then checkout from the index though
- cur_head.reset(new_head_commit)
- rw_repo.index.checkout(["lib"], force=True)#
- #END ignore block
+
+ cur_head.reset(new_head_commit, index=True) # index only
+ assert cur_head.reference.commit == new_head_commit
+
+ self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
+ new_head_commit = new_head_commit.parents[0]
+ cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
+ assert cur_head.reference.commit == new_head_commit
+
+ # paths - make sure we have something to do
+ rw_repo.index.reset(old_head_commit.parents[0])
+ cur_head.reset(cur_head, paths = "test")
+ cur_head.reset(new_head_commit, paths = "lib")
+ # hard resets with paths don't work, its all or nothing
+ self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib")
+
+ # we can do a mixed reset, and then checkout from the index though
+ cur_head.reset(new_head_commit)
+ rw_repo.index.checkout(["lib"], force=True)#
# now that we have a write write repo, change the HEAD reference - its
# like git-reset --soft
diff --git a/gitdb/test/test_util.py b/gitdb/test/test_util.py
index 0847ab5..53e2569 100644
--- a/gitdb/test/test_util.py
+++ b/gitdb/test/test_util.py
@@ -3,131 +3,8 @@
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
-import tempfile
-import os
-from lib import TestBase
-from gitdb.util import (
- to_hex_sha,
- to_bin_sha,
- NULL_HEX_SHA,
- LockedFD,
- Actor
- )
class TestUtils(TestBase):
- def test_basics(self):
- assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
- assert len(to_bin_sha(NULL_HEX_SHA)) == 20
- assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
-
- def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
- # does not match data string
- fp = open(file_path, "rb")
- try:
- assert fp.read() == data
- finally:
- fp.close()
-
- def test_lockedfd(self):
- my_file = tempfile.mktemp()
- orig_data = "hello"
- new_data = "world"
- my_file_fp = open(my_file, "wb")
- my_file_fp.write(orig_data)
- my_file_fp.close()
-
- try:
- lfd = LockedFD(my_file)
- lockfilepath = lfd._lockfilepath()
-
- # cannot end before it was started
- self.failUnlessRaises(AssertionError, lfd.rollback)
- self.failUnlessRaises(AssertionError, lfd.commit)
-
- # open for writing
- assert not os.path.isfile(lockfilepath)
- wfd = lfd.open(write=True)
- assert lfd._fd is wfd
- assert os.path.isfile(lockfilepath)
-
- # write data and fail
- os.write(wfd, new_data)
- lfd.rollback()
- assert lfd._fd is None
- self._cmp_contents(my_file, orig_data)
- assert not os.path.isfile(lockfilepath)
-
- # additional call doesnt fail
- lfd.commit()
- lfd.rollback()
-
- # test reading
- lfd = LockedFD(my_file)
- rfd = lfd.open(write=False)
- assert os.read(rfd, len(orig_data)) == orig_data
-
- assert os.path.isfile(lockfilepath)
- # deletion rolls back
- del(lfd)
- assert not os.path.isfile(lockfilepath)
-
-
- # write data - concurrently
- lfd = LockedFD(my_file)
- olfd = LockedFD(my_file)
- assert not os.path.isfile(lockfilepath)
- wfdstream = lfd.open(write=True, stream=True) # this time as stream
- assert os.path.isfile(lockfilepath)
- # another one fails
- self.failUnlessRaises(IOError, olfd.open)
-
- wfdstream.write(new_data)
- lfd.commit()
- assert not os.path.isfile(lockfilepath)
- self._cmp_contents(my_file, new_data)
-
- # could test automatic _end_writing on destruction
- finally:
- os.remove(my_file)
- # END final cleanup
-
- # try non-existing file for reading
- lfd = LockedFD(tempfile.mktemp())
- try:
- lfd.open(write=False)
- except OSError:
- assert not os.path.exists(lfd._lockfilepath())
- else:
- self.fail("expected OSError")
- # END handle exceptions
-
-
-class TestActor(TestBase):
- def test_from_string_should_separate_name_and_email(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert "Michael Trier" == a.name
- assert "mtrier@example.com" == a.email
-
- # base type capabilities
- assert a == a
- assert not ( a != a )
- m = set()
- m.add(a)
- m.add(a)
- assert len(m) == 1
-
- def test_from_string_should_handle_just_name(self):
- a = Actor._from_string("Michael Trier")
- assert "Michael Trier" == a.name
- assert None == a.email
-
- def test_should_display_representation(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert '<git.Actor "Michael Trier <mtrier@example.com>">' == repr(a)
-
- def test_str_should_alias_name(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert a.name == str(a)
+
diff --git a/gitdb/util.py b/gitdb/util.py
index 75f41aa..3dc30d8 100644
--- a/gitdb/util.py
+++ b/gitdb/util.py
@@ -1,744 +1,3 @@
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
-# This module is part of GitDB and is released under
-# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-import platform
-import binascii
-import os
-import mmap
-import sys
-import errno
-import re
-from cStringIO import StringIO
-
-# in py 2.4, StringIO is only StringI, without write support.
-# Hence we must use the python implementation for this
-if sys.version_info[1] < 5:
- from StringIO import StringIO
-# END handle python 2.4
-
-try:
- import async.mod.zlib as zlib
-except ImportError:
- import zlib
-# END try async zlib
-
-from async import ThreadPool
-
-try:
- import hashlib
-except ImportError:
- import sha
-
-try:
- from struct import unpack_from
-except ImportError:
- from struct import unpack, calcsize
- __calcsize_cache = dict()
- def unpack_from(fmt, data, offset=0):
- try:
- size = __calcsize_cache[fmt]
- except KeyError:
- size = calcsize(fmt)
- __calcsize_cache[fmt] = size
- # END exception handling
- return unpack(fmt, data[offset : offset + size])
- # END own unpack_from implementation
-
-
-#{ Globals
-
-# A pool distributing tasks, initially with zero threads, hence everything
-# will be handled in the main thread
-pool = ThreadPool(0)
-
-#} END globals
-
-
-#{ Aliases
-
-hex_to_bin = binascii.a2b_hex
-bin_to_hex = binascii.b2a_hex
-
-# errors
-ENOENT = errno.ENOENT
-
-# os shortcuts
-exists = os.path.exists
-mkdir = os.mkdir
-chmod = os.chmod
-isdir = os.path.isdir
-isfile = os.path.isfile
-rename = os.rename
-remove = os.remove
-dirname = os.path.dirname
-basename = os.path.basename
-normpath = os.path.normpath
-expandvars = os.path.expandvars
-expanduser = os.path.expanduser
-abspath = os.path.abspath
-join = os.path.join
-read = os.read
-write = os.write
-close = os.close
-fsync = os.fsync
-
-# constants
-NULL_HEX_SHA = "0"*40
-NULL_BIN_SHA = "\0"*20
-
-#} END Aliases
-
-#{ compatibility stuff ...
-
-class _RandomAccessStringIO(object):
- """Wrapper to provide required functionality in case memory maps cannot or may
- not be used. This is only really required in python 2.4"""
- __slots__ = '_sio'
-
- def __init__(self, buf=''):
- self._sio = StringIO(buf)
-
- def __getattr__(self, attr):
- return getattr(self._sio, attr)
-
- def __len__(self):
- return len(self.getvalue())
-
- def __getitem__(self, i):
- return self.getvalue()[i]
-
- def __getslice__(self, start, end):
- return self.getvalue()[start:end]
-
-#} END compatibility stuff ...
-
-#{ Routines
-
-def get_user_id():
- """:return: string identifying the currently active system user as name@node
- :note: user can be set with the 'USER' environment variable, usually set on windows"""
- ukn = 'UNKNOWN'
- username = os.environ.get('USER', os.environ.get('USERNAME', ukn))
- if username == ukn and hasattr(os, 'getlogin'):
- username = os.getlogin()
- # END get username from login
- return "%s@%s" % (username, platform.node())
-
-def is_git_dir(d):
- """ This is taken from the git setup.c:is_git_directory
- function."""
- if isdir(d) and \
- isdir(join(d, 'objects')) and \
- isdir(join(d, 'refs')):
- headref = join(d, 'HEAD')
- return isfile(headref) or \
- (os.path.islink(headref) and
- os.readlink(headref).startswith('refs'))
- return False
-
-
-def stream_copy(source, destination, chunk_size=512*1024):
- """Copy all data from the source stream into the destination stream in chunks
- of size chunk_size
-
- :return: amount of bytes written"""
- br = 0
- while True:
- chunk = source.read(chunk_size)
- destination.write(chunk)
- br += len(chunk)
- if len(chunk) < chunk_size:
- break
- # END reading output stream
- return br
-
-def make_sha(source=''):
- """A python2.4 workaround for the sha/hashlib module fiasco
- :note: From the dulwich project """
- try:
- return hashlib.sha1(source)
- except NameError:
- sha1 = sha.sha(source)
- return sha1
-
-def allocate_memory(size):
- """:return: a file-protocol accessible memory block of the given size"""
- if size == 0:
- return _RandomAccessStringIO('')
- # END handle empty chunks gracefully
-
- try:
- return mmap.mmap(-1, size) # read-write by default
- except EnvironmentError:
- # setup real memory instead
- # this of course may fail if the amount of memory is not available in
- # one chunk - would only be the case in python 2.4, being more likely on
- # 32 bit systems.
- return _RandomAccessStringIO("\0"*size)
- # END handle memory allocation
-
-
-def file_contents_ro(fd, stream=False, allow_mmap=True):
- """:return: read-only contents of the file represented by the file descriptor fd
- :param fd: file descriptor opened for reading
- :param stream: if False, random access is provided, otherwise the stream interface
- is provided.
- :param allow_mmap: if True, its allowed to map the contents into memory, which
- allows large files to be handled and accessed efficiently. The file-descriptor
- will change its position if this is False"""
- try:
- if allow_mmap:
- # supports stream and random access
- try:
- return mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
- except EnvironmentError:
- # python 2.4 issue, 0 wants to be the actual size
- return mmap.mmap(fd, os.fstat(fd).st_size, access=mmap.ACCESS_READ)
- # END handle python 2.4
- except OSError:
- pass
- # END exception handling
-
- # read manully
- contents = os.read(fd, os.fstat(fd).st_size)
- if stream:
- return _RandomAccessStringIO(contents)
- return contents
-
-def file_contents_ro_filepath(filepath, stream=False, allow_mmap=True, flags=0):
- """Get the file contents at filepath as fast as possible
- :return: random access compatible memory of the given filepath
- :param stream: see ``file_contents_ro``
- :param allow_mmap: see ``file_contents_ro``
- :param flags: additional flags to pass to os.open
- :raise OSError: If the file could not be opened
- :note: for now we don't try to use O_NOATIME directly as the right value needs to be
- shared per database in fact. It only makes a real difference for loose object
- databases anyway, and they use it with the help of the ``flags`` parameter"""
- fd = os.open(filepath, os.O_RDONLY|getattr(os, 'O_BINARY', 0)|flags)
- try:
- return file_contents_ro(fd, stream, allow_mmap)
- finally:
- close(fd)
- # END assure file is closed
-
-def to_hex_sha(sha):
- """:return: hexified version of sha"""
- if len(sha) == 40:
- return sha
- return bin_to_hex(sha)
-
-def to_bin_sha(sha):
- if len(sha) == 20:
- return sha
- return hex_to_bin(sha)
-
-def join_path(a, *p):
- """Join path tokens together similar to os.path.join, but always use
- '/' instead of possibly '\' on windows."""
- path = a
- for b in p:
- if b.startswith('/'):
- path += b[1:]
- elif path == '' or path.endswith('/'):
- path += b
- else:
- path += '/' + b
- return path
-
-def to_native_path_windows(path):
- return path.replace('/','\\')
-
-def to_native_path_linux(path):
- return path.replace('\\','/')
-
-
-if sys.platform.startswith('win'):
- to_native_path = to_native_path_windows
-else:
- # no need for any work on linux
- def to_native_path_linux(path):
- return path
- to_native_path = to_native_path_linux
-
-def join_path_native(a, *p):
- """
- As join path, but makes sure an OS native path is returned. This is only
- needed to play it safe on my dear windows and to assure nice paths that only
- use '\'"""
- return to_native_path(join_path(a, *p))
-
-def assure_directory_exists(path, is_file=False):
- """Assure that the directory pointed to by path exists.
-
- :param is_file: If True, path is assumed to be a file and handled correctly.
- Otherwise it must be a directory
- :return: True if the directory was created, False if it already existed"""
- if is_file:
- path = os.path.dirname(path)
- #END handle file
- if not os.path.isdir(path):
- os.makedirs(path)
- return True
- return False
-
-
-#} END routines
-
-
-#{ Utilities
-
-class LazyMixin(object):
- """
- Base class providing an interface to lazily retrieve attribute values upon
- first access. If slots are used, memory will only be reserved once the attribute
- is actually accessed and retrieved the first time. All future accesses will
- return the cached value as stored in the Instance's dict or slot.
- """
-
- __slots__ = tuple()
-
- def __getattr__(self, attr):
- """
- Whenever an attribute is requested that we do not know, we allow it
- to be created and set. Next time the same attribute is reqeusted, it is simply
- returned from our dict/slots. """
- self._set_cache_(attr)
- # will raise in case the cache was not created
- return object.__getattribute__(self, attr)
-
- def _set_cache_(self, attr):
- """
- This method should be overridden in the derived class.
- It should check whether the attribute named by attr can be created
- and cached. Do nothing if you do not know the attribute or call your subclass
-
- The derived class may create as many additional attributes as it deems
- necessary in case a git command returns more information than represented
- in the single attribute."""
- pass
-
-
-class LockedFD(object):
- """
- This class facilitates a safe read and write operation to a file on disk.
- If we write to 'file', we obtain a lock file at 'file.lock' and write to
- that instead. If we succeed, the lock file will be renamed to overwrite
- the original file.
-
- When reading, we obtain a lock file, but to prevent other writers from
- succeeding while we are reading the file.
-
- This type handles error correctly in that it will assure a consistent state
- on destruction.
-
- :note: with this setup, parallel reading is not possible"""
- __slots__ = ("_filepath", '_fd', '_write')
-
- def __init__(self, filepath):
- """Initialize an instance with the givne filepath"""
- self._filepath = filepath
- self._fd = None
- self._write = None # if True, we write a file
-
- def __del__(self):
- # will do nothing if the file descriptor is already closed
- if self._fd is not None:
- self.rollback()
-
- def _lockfilepath(self):
- return "%s.lock" % self._filepath
-
- def open(self, write=False, stream=False):
- """
- Open the file descriptor for reading or writing, both in binary mode.
-
- :param write: if True, the file descriptor will be opened for writing. Other
- wise it will be opened read-only.
- :param stream: if True, the file descriptor will be wrapped into a simple stream
- object which supports only reading or writing
- :return: fd to read from or write to. It is still maintained by this instance
- and must not be closed directly
- :raise IOError: if the lock could not be retrieved
- :raise OSError: If the actual file could not be opened for reading
- :note: must only be called once"""
- if self._write is not None:
- raise AssertionError("Called %s multiple times" % self.open)
-
- self._write = write
-
- # try to open the lock file
- binary = getattr(os, 'O_BINARY', 0)
- lockmode = os.O_WRONLY | os.O_CREAT | os.O_EXCL | binary
- try:
- fd = os.open(self._lockfilepath(), lockmode, 0600)
- if not write:
- os.close(fd)
- else:
- self._fd = fd
- # END handle file descriptor
- except OSError:
- raise IOError("Lock at %r could not be obtained" % self._lockfilepath())
- # END handle lock retrieval
-
- # open actual file if required
- if self._fd is None:
- # we could specify exlusive here, as we obtained the lock anyway
- try:
- self._fd = os.open(self._filepath, os.O_RDONLY | binary)
- except:
- # assure we release our lockfile
- os.remove(self._lockfilepath())
- raise
- # END handle lockfile
- # END open descriptor for reading
-
- if stream:
- # need delayed import
- from stream import FDStream
- return FDStream(self._fd)
- else:
- return self._fd
- # END handle stream
-
- def commit(self):
- """When done writing, call this function to commit your changes into the
- actual file.
- The file descriptor will be closed, and the lockfile handled.
- :note: can be called multiple times"""
- self._end_writing(successful=True)
-
- def rollback(self):
- """Abort your operation without any changes. The file descriptor will be
- closed, and the lock released.
- :note: can be called multiple times"""
- self._end_writing(successful=False)
-
- def _end_writing(self, successful=True):
- """Handle the lock according to the write mode """
- if self._write is None:
- raise AssertionError("Cannot end operation if it wasn't started yet")
-
- if self._fd is None:
- return
-
- os.close(self._fd)
- self._fd = None
-
- lockfile = self._lockfilepath()
- if self._write and successful:
- # on windows, rename does not silently overwrite the existing one
- if sys.platform == "win32":
- if isfile(self._filepath):
- os.remove(self._filepath)
- # END remove if exists
- # END win32 special handling
- os.rename(lockfile, self._filepath)
-
- # assure others can at least read the file - the tmpfile left it at rw--
- # We may also write that file, on windows that boils down to a remove-
- # protection as well
- chmod(self._filepath, 0644)
- else:
- # just delete the file so far, we failed
- os.remove(lockfile)
- # END successful handling
-
-
-class LockFile(object):
- """Provides methods to obtain, check for, and release a file based lock which
- should be used to handle concurrent access to the same file.
-
- As we are a utility class to be derived from, we only use protected methods.
-
- Locks will automatically be released on destruction"""
- __slots__ = ("_file_path", "_owns_lock")
-
- def __init__(self, file_path):
- self._file_path = file_path
- self._owns_lock = False
-
- def __del__(self):
- self._release_lock()
-
- def _lock_file_path(self):
- """:return: Path to lockfile"""
- return "%s.lock" % (self._file_path)
-
- def _has_lock(self):
- """:return: True if we have a lock and if the lockfile still exists
- :raise AssertionError: if our lock-file does not exist"""
- if not self._owns_lock:
- return False
-
- return True
-
- def _obtain_lock_or_raise(self):
- """Create a lock file as flag for other instances, mark our instance as lock-holder
-
- :raise IOError: if a lock was already present or a lock file could not be written"""
- if self._has_lock():
- return
- lock_file = self._lock_file_path()
- if os.path.isfile(lock_file):
- raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" % (self._file_path, lock_file))
-
- try:
- fd = os.open(lock_file, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0)
- os.close(fd)
- except OSError,e:
- raise IOError(str(e))
-
- self._owns_lock = True
-
- def _obtain_lock(self):
- """The default implementation will raise if a lock cannot be obtained.
- Subclasses may override this method to provide a different implementation"""
- return self._obtain_lock_or_raise()
-
- def _release_lock(self):
- """Release our lock if we have one"""
- if not self._has_lock():
- return
-
- # if someone removed our file beforhand, lets just flag this issue
- # instead of failing, to make it more usable.
- lfp = self._lock_file_path()
- try:
- # on bloody windows, the file needs write permissions to be removable.
- # Why ...
- if os.name == 'nt':
- os.chmod(lfp, 0777)
- # END handle win32
- os.remove(lfp)
- except OSError:
- pass
- self._owns_lock = False
-
-
-class BlockingLockFile(LockFile):
- """The lock file will block until a lock could be obtained, or fail after
- a specified timeout.
-
- :note: If the directory containing the lock was removed, an exception will
- be raised during the blocking period, preventing hangs as the lock
- can never be obtained."""
- __slots__ = ("_check_interval", "_max_block_time")
- def __init__(self, file_path, check_interval_s=0.3, max_block_time_s=sys.maxint):
- """Configure the instance
-
- :parm check_interval_s:
- Period of time to sleep until the lock is checked the next time.
- By default, it waits a nearly unlimited time
-
- :parm max_block_time_s: Maximum amount of seconds we may lock"""
- super(BlockingLockFile, self).__init__(file_path)
- self._check_interval = check_interval_s
- self._max_block_time = max_block_time_s
-
- def _obtain_lock(self):
- """This method blocks until it obtained the lock, or raises IOError if
- it ran out of time or if the parent directory was not available anymore.
- If this method returns, you are guranteed to own the lock"""
- starttime = time.time()
- maxtime = starttime + float(self._max_block_time)
- while True:
- try:
- super(BlockingLockFile, self)._obtain_lock()
- except IOError:
- # synity check: if the directory leading to the lockfile is not
- # readable anymore, raise an execption
- curtime = time.time()
- if not os.path.isdir(os.path.dirname(self._lock_file_path())):
- msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (self._lock_file_path(), curtime - starttime)
- raise IOError(msg)
- # END handle missing directory
-
- if curtime >= maxtime:
- msg = "Waited %g seconds for lock at %r" % ( maxtime - starttime, self._lock_file_path())
- raise IOError(msg)
- # END abort if we wait too long
- time.sleep(self._check_interval)
- else:
- break
- # END endless loop
-
-
-class Actor(object):
- """Actors hold information about a person acting on the repository. They
- can be committers and authors or anything with a name and an email as
- mentioned in the git log entries."""
- # PRECOMPILED REGEX
- name_only_regex = re.compile( r'<(.+)>' )
- name_email_regex = re.compile( r'(.*) <(.+?)>' )
-
- # ENVIRONMENT VARIABLES
- # read when creating new commits
- env_author_name = "GIT_AUTHOR_NAME"
- env_author_email = "GIT_AUTHOR_EMAIL"
- env_committer_name = "GIT_COMMITTER_NAME"
- env_committer_email = "GIT_COMMITTER_EMAIL"
-
- # CONFIGURATION KEYS
- conf_name = 'name'
- conf_email = 'email'
-
- __slots__ = ('name', 'email')
-
- def __init__(self, name, email):
- self.name = name
- self.email = email
-
- def __eq__(self, other):
- return self.name == other.name and self.email == other.email
-
- def __ne__(self, other):
- return not (self == other)
-
- def __hash__(self):
- return hash((self.name, self.email))
-
- def __str__(self):
- return self.name
-
- def __repr__(self):
- return '<git.Actor "%s <%s>">' % (self.name, self.email)
-
- @classmethod
- def _from_string(cls, string):
- """Create an Actor from a string.
- :param string: is the string, which is expected to be in regular git format
-
- John Doe <jdoe@example.com>
-
- :return: Actor """
- m = cls.name_email_regex.search(string)
- if m:
- name, email = m.groups()
- return cls(name, email)
- else:
- m = cls.name_only_regex.search(string)
- if m:
- return cls(m.group(1), None)
- else:
- # assume best and use the whole string as name
- return cls(string, None)
- # END special case name
- # END handle name/email matching
-
- @classmethod
- def _main_actor(cls, env_name, env_email, config_reader=None):
- actor = cls('', '')
- default_email = get_user_id()
- default_name = default_email.split('@')[0]
-
- for attr, evar, cvar, default in (('name', env_name, cls.conf_name, default_name),
- ('email', env_email, cls.conf_email, default_email)):
- try:
- setattr(actor, attr, os.environ[evar])
- except KeyError:
- if config_reader is not None:
- setattr(actor, attr, config_reader.get_value('user', cvar, default))
- #END config-reader handling
- if not getattr(actor, attr):
- setattr(actor, attr, default)
- #END handle name
- #END for each item to retrieve
- return actor
-
-
- @classmethod
- def committer(cls, config_reader=None):
- """
- :return: Actor instance corresponding to the configured committer. It behaves
- similar to the git implementation, such that the environment will override
- configuration values of config_reader. If no value is set at all, it will be
- generated
- :param config_reader: ConfigReader to use to retrieve the values from in case
- they are not set in the environment"""
- return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
-
- @classmethod
- def author(cls, config_reader=None):
- """Same as committer(), but defines the main author. It may be specified in the environment,
- but defaults to the committer"""
- return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
-
-
-class Iterable(object):
- """Defines an interface for iterable items which is to assure a uniform
- way to retrieve and iterate items within the git repository"""
- __slots__ = tuple()
- _id_attribute_ = "attribute that most suitably identifies your instance"
-
- @classmethod
- def list_items(cls, repo, *args, **kwargs):
- """
- Find all items of this type - subclasses can specify args and kwargs differently.
- If no args are given, subclasses are obliged to return all items if no additional
- arguments arg given.
-
- :note: Favor the iter_items method as it will
-
- :return:list(Item,...) list of item instances"""
- out_list = IterableList( cls._id_attribute_ )
- out_list.extend(cls.iter_items(repo, *args, **kwargs))
- return out_list
-
-
- @classmethod
- def iter_items(cls, repo, *args, **kwargs):
- """For more information about the arguments, see list_items
- :return: iterator yielding Items"""
- raise NotImplementedError("To be implemented by Subclass")
-
-
-class IterableList(list):
- """
- List of iterable objects allowing to query an object by id or by named index::
-
- heads = repo.heads
- heads.master
- heads['master']
- heads[0]
-
- It requires an id_attribute name to be set which will be queried from its
- contained items to have a means for comparison.
-
- A prefix can be specified which is to be used in case the id returned by the
- items always contains a prefix that does not matter to the user, so it
- can be left out."""
- __slots__ = ('_id_attr', '_prefix')
-
- def __new__(cls, id_attr, prefix=''):
- return super(IterableList,cls).__new__(cls)
-
- def __init__(self, id_attr, prefix=''):
- self._id_attr = id_attr
- self._prefix = prefix
- if not isinstance(id_attr, basestring):
- raise ValueError("First parameter must be a string identifying the name-property. Extend the list after initialization")
- # END help debugging !
-
- def __getattr__(self, attr):
- attr = self._prefix + attr
- for item in self:
- if getattr(item, self._id_attr) == attr:
- return item
- # END for each item
- return list.__getattribute__(self, attr)
-
- def __getitem__(self, index):
- if isinstance(index, int):
- return list.__getitem__(self,index)
-
- try:
- return getattr(self, index)
- except AttributeError:
- raise IndexError( "No item found with id %r" % (self._prefix + index) )
-
-
-
-#} END utilities