diff options
Diffstat (limited to 'git/refs')
-rw-r--r-- | git/refs/__init__.py | 21 | ||||
-rw-r--r-- | git/refs/head.py | 246 | ||||
-rw-r--r-- | git/refs/log.py | 282 | ||||
-rw-r--r-- | git/refs/reference.py | 84 | ||||
-rw-r--r-- | git/refs/remote.py | 63 | ||||
-rw-r--r-- | git/refs/symbolic.py | 618 | ||||
-rw-r--r-- | git/refs/tag.py | 91 |
7 files changed, 1405 insertions, 0 deletions
diff --git a/git/refs/__init__.py b/git/refs/__init__.py new file mode 100644 index 00000000..fc8ce644 --- /dev/null +++ b/git/refs/__init__.py @@ -0,0 +1,21 @@ + +# import all modules in order, fix the names they require +from symbolic import * +from reference import * +from head import * +from tag import * +from remote import * + +# name fixes +import head +head.RemoteReference = RemoteReference +del(head) + + +import symbolic +for item in (HEAD, Head, RemoteReference, TagReference, Reference, SymbolicReference): + setattr(symbolic, item.__name__, item) +del(symbolic) + + +from log import * diff --git a/git/refs/head.py b/git/refs/head.py new file mode 100644 index 00000000..d8729434 --- /dev/null +++ b/git/refs/head.py @@ -0,0 +1,246 @@ +from symbolic import SymbolicReference +from reference import Reference + +from git.config import SectionConstraint + +from git.util import join_path + +from git.exc import GitCommandError + +__all__ = ["HEAD", "Head"] + + + +class HEAD(SymbolicReference): + """Special case of a Symbolic Reference as it represents the repository's + HEAD reference.""" + _HEAD_NAME = 'HEAD' + _ORIG_HEAD_NAME = 'ORIG_HEAD' + __slots__ = tuple() + + def __init__(self, repo, path=_HEAD_NAME): + if path != self._HEAD_NAME: + raise ValueError("HEAD instance must point to %r, got %r" % (self._HEAD_NAME, path)) + super(HEAD, self).__init__(repo, path) + + def orig_head(self): + """ + :return: SymbolicReference pointing at the ORIG_HEAD, which is maintained + to contain the previous value of HEAD""" + return SymbolicReference(self.repo, self._ORIG_HEAD_NAME) + + def reset(self, commit='HEAD', index=True, working_tree = False, + paths=None, **kwargs): + """Reset our HEAD to the given commit optionally synchronizing + the index and working tree. The reference we refer to will be set to + commit as well. + + :param commit: + Commit object, Reference Object or string identifying a revision we + should reset HEAD to. + + :param index: + If True, the index will be set to match the given commit. Otherwise + it will not be touched. + + :param working_tree: + If True, the working tree will be forcefully adjusted to match the given + commit, possibly overwriting uncommitted changes without warning. + If working_tree is True, index must be true as well + + :param paths: + Single path or list of paths relative to the git root directory + that are to be reset. This allows to partially reset individual files. + + :param kwargs: + Additional arguments passed to git-reset. + + :return: self""" + mode = "--soft" + add_arg = None + if index: + mode = "--mixed" + + # it appears, some git-versions declare mixed and paths deprecated + # see http://github.com/Byron/GitPython/issues#issue/2 + if paths: + mode = None + # END special case + # END handle index + + if working_tree: + mode = "--hard" + if not index: + raise ValueError( "Cannot reset the working tree if the index is not reset as well") + + # END working tree handling + + if paths: + add_arg = "--" + # END nicely separate paths from rest + + try: + self.repo.git.reset(mode, commit, add_arg, paths, **kwargs) + except GitCommandError, e: + # git nowadays may use 1 as status to indicate there are still unstaged + # modifications after the reset + if e.status != 1: + raise + # END handle exception + + return self + + +class Head(Reference): + """A Head is a named reference to a Commit. Every Head instance contains a name + and a Commit object. + + Examples:: + + >>> repo = Repo("/path/to/repo") + >>> head = repo.heads[0] + + >>> head.name + 'master' + + >>> head.commit + <git.Commit "1c09f116cbc2cb4100fb6935bb162daa4723f455"> + + >>> head.commit.hexsha + '1c09f116cbc2cb4100fb6935bb162daa4723f455'""" + _common_path_default = "refs/heads" + k_config_remote = "remote" + k_config_remote_ref = "merge" # branch to merge from remote + + @classmethod + def delete(cls, repo, *heads, **kwargs): + """Delete the given heads + :param force: + If True, the heads will be deleted even if they are not yet merged into + the main development stream. + Default False""" + force = kwargs.get("force", False) + flag = "-d" + if force: + flag = "-D" + repo.git.branch(flag, *heads) + + def set_tracking_branch(self, remote_reference): + """ + Configure this branch to track the given remote reference. This will alter + this branch's configuration accordingly. + + :param remote_reference: The remote reference to track or None to untrack + any references + :return: self""" + if remote_reference is not None and not isinstance(remote_reference, RemoteReference): + raise ValueError("Incorrect parameter type: %r" % remote_reference) + # END handle type + + writer = self.config_writer() + if remote_reference is None: + writer.remove_option(self.k_config_remote) + writer.remove_option(self.k_config_remote_ref) + if len(writer.options()) == 0: + writer.remove_section() + # END handle remove section + else: + writer.set_value(self.k_config_remote, remote_reference.remote_name) + writer.set_value(self.k_config_remote_ref, Head.to_full_path(remote_reference.remote_head)) + # END handle ref value + + return self + + + def tracking_branch(self): + """ + :return: The remote_reference we are tracking, or None if we are + not a tracking branch""" + reader = self.config_reader() + if reader.has_option(self.k_config_remote) and reader.has_option(self.k_config_remote_ref): + ref = Head(self.repo, Head.to_full_path(reader.get_value(self.k_config_remote_ref))) + remote_refpath = RemoteReference.to_full_path(join_path(reader.get_value(self.k_config_remote), ref.name)) + return RemoteReference(self.repo, remote_refpath) + # END handle have tracking branch + + # we are not a tracking branch + return None + + def rename(self, new_path, force=False): + """Rename self to a new path + + :param new_path: + Either a simple name or a path, i.e. new_name or features/new_name. + The prefix refs/heads is implied + + :param force: + If True, the rename will succeed even if a head with the target name + already exists. + + :return: self + :note: respects the ref log as git commands are used""" + flag = "-m" + if force: + flag = "-M" + + self.repo.git.branch(flag, self, new_path) + self.path = "%s/%s" % (self._common_path_default, new_path) + return self + + def checkout(self, force=False, **kwargs): + """Checkout this head by setting the HEAD to this reference, by updating the index + to reflect the tree we point to and by updating the working tree to reflect + the latest index. + + The command will fail if changed working tree files would be overwritten. + + :param force: + If True, changes to the index and the working tree will be discarded. + If False, GitCommandError will be raised in that situation. + + :param kwargs: + Additional keyword arguments to be passed to git checkout, i.e. + b='new_branch' to create a new branch at the given spot. + + :return: + The active branch after the checkout operation, usually self unless + a new branch has been created. + + :note: + By default it is only allowed to checkout heads - everything else + will leave the HEAD detached which is allowed and possible, but remains + a special state that some tools might not be able to handle.""" + args = list() + kwargs['f'] = force + if kwargs['f'] == False: + kwargs.pop('f') + + self.repo.git.checkout(self, **kwargs) + return self.repo.active_branch + + #{ Configruation + + def _config_parser(self, read_only): + if read_only: + parser = self.repo.config_reader() + else: + parser = self.repo.config_writer() + # END handle parser instance + + return SectionConstraint(parser, 'branch "%s"' % self.name) + + def config_reader(self): + """ + :return: A configuration parser instance constrained to only read + this instance's values""" + return self._config_parser(read_only=True) + + def config_writer(self): + """ + :return: A configuration writer instance with read-and write acccess + to options of this head""" + return self._config_parser(read_only=False) + + #} END configuration + + diff --git a/git/refs/log.py b/git/refs/log.py new file mode 100644 index 00000000..f49c07fd --- /dev/null +++ b/git/refs/log.py @@ -0,0 +1,282 @@ +from git.util import ( + join_path, + Actor, + LockedFD, + LockFile, + assure_directory_exists, + to_native_path, + ) + +from gitdb.util import ( + bin_to_hex, + join, + file_contents_ro_filepath, + ) + +from git.objects.util import ( + parse_date, + Serializable, + utctz_to_altz, + altz_to_utctz_str, + ) + +import time +import os +import re + +__all__ = ["RefLog", "RefLogEntry"] + + +class RefLogEntry(tuple): + """Named tuple allowing easy access to the revlog data fields""" + _fmt = "%s %s %s <%s> %i %s\t%s\n" + _re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$') + __slots__ = tuple() + + def __repr__(self): + """Representation of ourselves in git reflog format""" + act = self.actor + time = self.time + return self._fmt % (self.oldhexsha, self.newhexsha, act.name, act.email, + time[0], altz_to_utctz_str(time[1]), self.message) + + @property + def oldhexsha(self): + """The hexsha to the commit the ref pointed to before the change""" + return self[0] + + @property + def newhexsha(self): + """The hexsha to the commit the ref now points to, after the change""" + return self[1] + + @property + def actor(self): + """Actor instance, providing access""" + return self[2] + + @property + def time(self): + """time as tuple: + + * [0] = int(time) + * [1] = int(timezone_offset) in time.altzone format """ + return self[3] + + @property + def message(self): + """Message describing the operation that acted on the reference""" + return self[4] + + @classmethod + def new(self, oldhexsha, newhexsha, actor, time, tz_offset, message): + """:return: New instance of a RefLogEntry""" + if not isinstance(actor, Actor): + raise ValueError("Need actor instance, got %s" % actor) + # END check types + return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message)) + + @classmethod + def from_line(cls, line): + """:return: New RefLogEntry instance from the given revlog line. + :param line: line without trailing newline + :raise ValueError: If line could not be parsed""" + try: + info, msg = line.split('\t', 2) + except ValueError: + raise ValueError("line is missing tab separator") + #END handle first plit + oldhexsha = info[:40] + newhexsha = info[41:81] + for hexsha in (oldhexsha, newhexsha): + if not cls._re_hexsha_only.match(hexsha): + raise ValueError("Invalid hexsha: %s" % hexsha) + # END if hexsha re doesn't match + #END for each hexsha + + email_end = info.find('>', 82) + if email_end == -1: + raise ValueError("Missing token: >") + #END handle missing end brace + + actor = Actor._from_string(info[82:email_end+1]) + time, tz_offset = parse_date(info[email_end+2:]) + + return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) + + +class RefLog(list, Serializable): + """A reflog contains reflog entries, each of which defines a certain state + of the head in question. Custom query methods allow to retrieve log entries + by date or by other criteria. + + Reflog entries are orded, the first added entry is first in the list, the last + entry, i.e. the last change of the head or reference, is last in the list.""" + + __slots__ = ('_path', ) + + def __new__(cls, filepath=None): + inst = super(RefLog, cls).__new__(cls) + return inst + + def __init__(self, filepath=None): + """Initialize this instance with an optional filepath, from which we will + initialize our data. The path is also used to write changes back using + the write() method""" + self._path = filepath + if filepath is not None: + self._read_from_file() + # END handle filepath + + def _read_from_file(self): + fmap = file_contents_ro_filepath(self._path, stream=False, allow_mmap=True) + try: + self._deserialize(fmap) + finally: + fmap.close() + #END handle closing of handle + + #{ Interface + + @classmethod + def from_file(cls, filepath): + """ + :return: a new RefLog instance containing all entries from the reflog + at the given filepath + :param filepath: path to reflog + :raise ValueError: If the file could not be read or was corrupted in some way""" + return cls(filepath) + + @classmethod + def path(cls, ref): + """ + :return: string to absolute path at which the reflog of the given ref + instance would be found. The path is not guaranteed to point to a valid + file though. + :param ref: SymbolicReference instance""" + return join(ref.repo.git_dir, "logs", to_native_path(ref.path)) + + @classmethod + def iter_entries(cls, stream): + """ + :return: Iterator yielding RefLogEntry instances, one for each line read + sfrom the given stream. + :param stream: file-like object containing the revlog in its native format + or basestring instance pointing to a file to read""" + new_entry = RefLogEntry.from_line + if isinstance(stream, basestring): + stream = file_contents_ro_filepath(stream) + #END handle stream type + while True: + line = stream.readline() + if not line: + return + yield new_entry(line.strip()) + #END endless loop + + @classmethod + def entry_at(cls, filepath, index): + """:return: RefLogEntry at the given index + :param filepath: full path to the index file from which to read the entry + :param index: python list compatible index, i.e. it may be negative to + specifiy an entry counted from the end of the list + + :raise IndexError: If the entry didn't exist + + .. note:: This method is faster as it only parses the entry at index, skipping + all other lines. Nonetheless, the whole file has to be read if + the index is negative + """ + fp = open(filepath, 'rb') + if index < 0: + return RefLogEntry.from_line(fp.readlines()[index].strip()) + else: + # read until index is reached + for i in xrange(index+1): + line = fp.readline() + if not line: + break + #END abort on eof + #END handle runup + + if i != index or not line: + raise IndexError + #END handle exception + + return RefLogEntry.from_line(line.strip()) + #END handle index + + def to_file(self, filepath): + """Write the contents of the reflog instance to a file at the given filepath. + :param filepath: path to file, parent directories are assumed to exist""" + lfd = LockedFD(filepath) + assure_directory_exists(filepath, is_file=True) + + fp = lfd.open(write=True, stream=True) + try: + self._serialize(fp) + lfd.commit() + except: + # on failure it rolls back automatically, but we make it clear + lfd.rollback() + raise + #END handle change + + @classmethod + def append_entry(cls, config_reader, filepath, oldbinsha, newbinsha, message): + """Append a new log entry to the revlog at filepath. + + :param config_reader: configuration reader of the repository - used to obtain + user information. May be None + :param filepath: full path to the log file + :param oldbinsha: binary sha of the previous commit + :param newbinsha: binary sha of the current commit + :param message: message describing the change to the reference + :param write: If True, the changes will be written right away. Otherwise + the change will not be written + :return: RefLogEntry objects which was appended to the log + :note: As we are append-only, concurrent access is not a problem as we + do not interfere with readers.""" + if len(oldbinsha) != 20 or len(newbinsha) != 20: + raise ValueError("Shas need to be given in binary format") + #END handle sha type + assure_directory_exists(filepath, is_file=True) + entry = RefLogEntry((bin_to_hex(oldbinsha), bin_to_hex(newbinsha), Actor.committer(config_reader), (int(time.time()), time.altzone), message)) + + lf = LockFile(filepath) + lf._obtain_lock_or_raise() + + fd = open(filepath, 'a') + try: + fd.write(repr(entry)) + finally: + fd.close() + lf._release_lock() + #END handle write operation + + return entry + + def write(self): + """Write this instance's data to the file we are originating from + :return: self""" + if self._path is None: + raise ValueError("Instance was not initialized with a path, use to_file(...) instead") + #END assert path + self.to_file(self._path) + return self + + #} END interface + + #{ Serializable Interface + def _serialize(self, stream): + lm1 = len(self) - 1 + write = stream.write + + # write all entries + for e in self: + write(repr(e)) + #END for each entry + + def _deserialize(self, stream): + self.extend(self.iter_entries(stream)) + #} END serializable interface diff --git a/git/refs/reference.py b/git/refs/reference.py new file mode 100644 index 00000000..1a745ee9 --- /dev/null +++ b/git/refs/reference.py @@ -0,0 +1,84 @@ +from symbolic import SymbolicReference +import os +from git.objects import Object +from git.util import ( + LazyMixin, + Iterable, + ) + +from gitdb.util import ( + isfile, + hex_to_bin + ) + +__all__ = ["Reference"] + + +class Reference(SymbolicReference, LazyMixin, Iterable): + """Represents a named reference to any object. Subclasses may apply restrictions though, + i.e. Heads can only point to commits.""" + __slots__ = tuple() + _points_to_commits_only = False + _resolve_ref_on_create = True + _common_path_default = "refs" + + def __init__(self, repo, path): + """Initialize this instance + :param repo: Our parent repository + + :param path: + Path relative to the .git/ directory pointing to the ref in question, i.e. + refs/heads/master""" + if not path.startswith(self._common_path_default+'/'): + raise ValueError("Cannot instantiate %r from path %s" % ( self.__class__.__name__, path )) + super(Reference, self).__init__(repo, path) + + + def __str__(self): + return self.name + + def set_object(self, object, logmsg = None): + """Special version which checks if the head-log needs an update as well""" + oldbinsha = None + if logmsg is not None: + head = self.repo.head + if not head.is_detached and head.ref == self: + oldbinsha = self.commit.binsha + #END handle commit retrieval + #END handle message is set + + super(Reference, self).set_object(object, logmsg) + + if oldbinsha is not None: + # /* from refs.c in git-source + # * Special hack: If a branch is updated directly and HEAD + # * points to it (may happen on the remote side of a push + # * for example) then logically the HEAD reflog should be + # * updated too. + # * A generic solution implies reverse symref information, + # * but finding all symrefs pointing to the given branch + # * would be rather costly for this rare event (the direct + # * update of a branch) to be worth it. So let's cheat and + # * check with HEAD only which should cover 99% of all usage + # * scenarios (even 100% of the default ones). + # */ + self.repo.head.log_append(oldbinsha, logmsg) + #END check if the head + + # NOTE: Don't have to overwrite properties as the will only work without a the log + + @property + def name(self): + """:return: (shortest) Name of this reference - it may contain path components""" + # first two path tokens are can be removed as they are + # refs/heads or refs/tags or refs/remotes + tokens = self.path.split('/') + if len(tokens) < 3: + return self.path # could be refs/HEAD + return '/'.join(tokens[2:]) + + @classmethod + def iter_items(cls, repo, common_path = None): + """Equivalent to SymbolicReference.iter_items, but will return non-detached + references as well.""" + return cls._iter_items(repo, common_path) diff --git a/git/refs/remote.py b/git/refs/remote.py new file mode 100644 index 00000000..b7b07d4b --- /dev/null +++ b/git/refs/remote.py @@ -0,0 +1,63 @@ +from head import Head +from git.util import join_path +from gitdb.util import join + +import os + + +__all__ = ["RemoteReference"] + + +class RemoteReference(Head): + """Represents a reference pointing to a remote head.""" + _common_path_default = "refs/remotes" + + + @classmethod + def iter_items(cls, repo, common_path = None, remote=None): + """Iterate remote references, and if given, constrain them to the given remote""" + common_path = common_path or cls._common_path_default + if remote is not None: + common_path = join_path(common_path, str(remote)) + # END handle remote constraint + return super(RemoteReference, cls).iter_items(repo, common_path) + + @property + def remote_name(self): + """ + :return: + Name of the remote we are a reference of, such as 'origin' for a reference + named 'origin/master'""" + tokens = self.path.split('/') + # /refs/remotes/<remote name>/<branch_name> + return tokens[2] + + @property + def remote_head(self): + """:return: Name of the remote head itself, i.e. master. + :note: The returned name is usually not qualified enough to uniquely identify + a branch""" + tokens = self.path.split('/') + return '/'.join(tokens[3:]) + + @classmethod + def delete(cls, repo, *refs, **kwargs): + """Delete the given remote references. + :note: + kwargs are given for compatability with the base class method as we + should not narrow the signature.""" + repo.git.branch("-d", "-r", *refs) + # the official deletion method will ignore remote symbolic refs - these + # are generally ignored in the refs/ folder. We don't though + # and delete remainders manually + for ref in refs: + try: + os.remove(join(repo.git_dir, ref.path)) + except OSError: + pass + # END for each ref + + @classmethod + def create(cls, *args, **kwargs): + """Used to disable this method""" + raise TypeError("Cannot explicitly create remote references") diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py new file mode 100644 index 00000000..9937cf0c --- /dev/null +++ b/git/refs/symbolic.py @@ -0,0 +1,618 @@ +import os +from git.objects import Object, Commit +from git.util import ( + join_path, + join_path_native, + to_native_path_linux, + assure_directory_exists + ) + +from gitdb.exc import BadObject +from gitdb.util import ( + join, + dirname, + isdir, + exists, + isfile, + rename, + hex_to_bin, + LockedFD + ) + +from log import RefLog + +__all__ = ["SymbolicReference"] + +class SymbolicReference(object): + """Represents a special case of a reference such that this reference is symbolic. + It does not point to a specific commit, but to another Head, which itself + specifies a commit. + + A typical example for a symbolic reference is HEAD.""" + __slots__ = ("repo", "path") + _resolve_ref_on_create = False + _points_to_commits_only = True + _common_path_default = "" + _id_attribute_ = "name" + + def __init__(self, repo, path): + self.repo = repo + self.path = path + + def __str__(self): + return self.path + + def __repr__(self): + return '<git.%s "%s">' % (self.__class__.__name__, self.path) + + def __eq__(self, other): + return self.path == other.path + + def __ne__(self, other): + return not ( self == other ) + + def __hash__(self): + return hash(self.path) + + @property + def name(self): + """ + :return: + In case of symbolic references, the shortest assumable name + is the path itself.""" + return self.path + + @property + def abspath(self): + return join_path_native(self.repo.git_dir, self.path) + + @classmethod + def _get_packed_refs_path(cls, repo): + return join(repo.git_dir, 'packed-refs') + + @classmethod + def _iter_packed_refs(cls, repo): + """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs. + :note: The packed refs file will be kept open as long as we iterate""" + try: + fp = open(cls._get_packed_refs_path(repo), 'r') + for line in fp: + line = line.strip() + if not line: + continue + if line.startswith('#'): + if line.startswith('# pack-refs with:') and not line.endswith('peeled'): + raise TypeError("PackingType of packed-Refs not understood: %r" % line) + # END abort if we do not understand the packing scheme + continue + # END parse comment + + # skip dereferenced tag object entries - previous line was actual + # tag reference for it + if line[0] == '^': + continue + + yield tuple(line.split(' ', 1)) + # END for each line + except (OSError,IOError): + raise StopIteration + # END no packed-refs file handling + # NOTE: Had try-finally block around here to close the fp, + # but some python version woudn't allow yields within that. + # I believe files are closing themselves on destruction, so it is + # alright. + + @classmethod + def dereference_recursive(cls, repo, ref_path): + """ + :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all + intermediate references as required + :param repo: the repository containing the reference at ref_path""" + while True: + hexsha, ref_path = cls._get_ref_info(repo, ref_path) + if hexsha is not None: + return hexsha + # END recursive dereferencing + + @classmethod + def _get_ref_info(cls, repo, ref_path): + """Return: (sha, target_ref_path) if available, the sha the file at + rela_path points to, or None. target_ref_path is the reference we + point to, or None""" + tokens = None + try: + fp = open(join(repo.git_dir, ref_path), 'r') + value = fp.read().rstrip() + fp.close() + tokens = value.split(" ") + except (OSError,IOError): + # Probably we are just packed, find our entry in the packed refs file + # NOTE: We are not a symbolic ref if we are in a packed file, as these + # are excluded explictly + for sha, path in cls._iter_packed_refs(repo): + if path != ref_path: continue + tokens = (sha, path) + break + # END for each packed ref + # END handle packed refs + if tokens is None: + raise ValueError("Reference at %r does not exist" % ref_path) + + # is it a reference ? + if tokens[0] == 'ref:': + return (None, tokens[1]) + + # its a commit + if repo.re_hexsha_only.match(tokens[0]): + return (tokens[0], None) + + raise ValueError("Failed to parse reference information from %r" % ref_path) + + def _get_object(self): + """ + :return: + The object our ref currently refers to. Refs can be cached, they will + always point to the actual object as it gets re-created on each query""" + # have to be dynamic here as we may be a tag which can point to anything + # Our path will be resolved to the hexsha which will be used accordingly + return Object.new_from_sha(self.repo, hex_to_bin(self.dereference_recursive(self.repo, self.path))) + + def _get_commit(self): + """ + :return: + Commit object we point to, works for detached and non-detached + SymbolicReferences. The symbolic reference will be dereferenced recursively.""" + obj = self._get_object() + if obj.type == 'tag': + obj = obj.object + #END dereference tag + + if obj.type != Commit.type: + raise TypeError("Symbolic Reference pointed to object %r, commit was required" % obj) + #END handle type + return obj + + def set_commit(self, commit, logmsg = None): + """As set_object, but restricts the type of object to be a Commit + + :raise ValueError: If commit is not a Commit object or doesn't point to + a commit + :return: self""" + # check the type - assume the best if it is a base-string + invalid_type = False + if isinstance(commit, Object): + invalid_type = commit.type != Commit.type + elif isinstance(commit, SymbolicReference): + invalid_type = commit.object.type != Commit.type + else: + try: + invalid_type = self.repo.rev_parse(commit).type != Commit.type + except BadObject: + raise ValueError("Invalid object: %s" % commit) + #END handle exception + # END verify type + + if invalid_type: + raise ValueError("Need commit, got %r" % commit) + #END handle raise + + # we leave strings to the rev-parse method below + self.set_object(commit, logmsg) + + return self + + + def set_object(self, object, logmsg = None): + """Set the object we point to, possibly dereference our symbolic reference first. + If the reference does not exist, it will be created + + :param object: a refspec, a SymbolicReference or an Object instance. SymbolicReferences + will be dereferenced beforehand to obtain the object they point to + :param logmsg: If not None, the message will be used in the reflog entry to be + written. Otherwise the reflog is not altered + :note: plain SymbolicReferences may not actually point to objects by convention + :return: self""" + if isinstance(object, SymbolicReference): + object = object.object + #END resolve references + + is_detached = True + try: + is_detached = self.is_detached + except ValueError: + pass + # END handle non-existing ones + + if is_detached: + return self.set_reference(object, logmsg) + + # set the commit on our reference + return self._get_reference().set_object(object, logmsg) + + commit = property(_get_commit, set_commit, doc="Query or set commits directly") + object = property(_get_object, set_object, doc="Return the object our ref currently refers to") + + def _get_reference(self): + """:return: Reference Object we point to + :raise TypeError: If this symbolic reference is detached, hence it doesn't point + to a reference, but to a commit""" + sha, target_ref_path = self._get_ref_info(self.repo, self.path) + if target_ref_path is None: + raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha)) + return self.from_path(self.repo, target_ref_path) + + def set_reference(self, ref, logmsg = None): + """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference. + Otherwise an Object, given as Object instance or refspec, is assumed and if valid, + will be set which effectively detaches the refererence if it was a purely + symbolic one. + + :param ref: SymbolicReference instance, Object instance or refspec string + Only if the ref is a SymbolicRef instance, we will point to it. Everthiny + else is dereferenced to obtain the actual object. + :param logmsg: If set to a string, the message will be used in the reflog. + Otherwise, a reflog entry is not written for the changed reference. + The previous commit of the entry will be the commit we point to now. + + See also: log_append() + + :return: self + :note: This symbolic reference will not be dereferenced. For that, see + ``set_object(...)``""" + write_value = None + obj = None + if isinstance(ref, SymbolicReference): + write_value = "ref: %s" % ref.path + elif isinstance(ref, Object): + obj = ref + write_value = ref.hexsha + elif isinstance(ref, basestring): + try: + obj = self.repo.rev_parse(ref+"^{}") # optionally deref tags + write_value = obj.hexsha + except BadObject: + raise ValueError("Could not extract object from %s" % ref) + # END end try string + else: + raise ValueError("Unrecognized Value: %r" % ref) + # END try commit attribute + + # typecheck + if obj is not None and self._points_to_commits_only and obj.type != Commit.type: + raise TypeError("Require commit, got %r" % obj) + #END verify type + + oldbinsha = None + if logmsg is not None: + try: + oldbinsha = self.commit.binsha + except ValueError: + oldbinsha = Commit.NULL_BIN_SHA + #END handle non-existing + #END retrieve old hexsha + + fpath = self.abspath + assure_directory_exists(fpath, is_file=True) + + lfd = LockedFD(fpath) + fd = lfd.open(write=True, stream=True) + fd.write(write_value) + lfd.commit() + + # Adjust the reflog + if logmsg is not None: + self.log_append(oldbinsha, logmsg) + #END handle reflog + + return self + + + # aliased reference + reference = property(_get_reference, set_reference, doc="Returns the Reference we point to") + ref = reference + + def is_valid(self): + """ + :return: + True if the reference is valid, hence it can be read and points to + a valid object or reference.""" + try: + self.object + except (OSError, ValueError): + return False + else: + return True + + @property + def is_detached(self): + """ + :return: + True if we are a detached reference, hence we point to a specific commit + instead to another reference""" + try: + self.ref + return False + except TypeError: + return True + + def log(self): + """ + :return: RefLog for this reference. Its last entry reflects the latest change + applied to this reference + + .. note:: As the log is parsed every time, its recommended to cache it for use + instead of calling this method repeatedly. It should be considered read-only.""" + return RefLog.from_file(RefLog.path(self)) + + def log_append(self, oldbinsha, message, newbinsha=None): + """Append a logentry to the logfile of this ref + + :param oldbinsha: binary sha this ref used to point to + :param message: A message describing the change + :param newbinsha: The sha the ref points to now. If None, our current commit sha + will be used + :return: added RefLogEntry instance""" + return RefLog.append_entry(self.repo.config_reader(), RefLog.path(self), oldbinsha, + (newbinsha is None and self.commit.binsha) or newbinsha, + message) + + def log_entry(self, index): + """:return: RefLogEntry at the given index + :param index: python list compatible positive or negative index + + .. note:: This method must read part of the reflog during execution, hence + it should be used sparringly, or only if you need just one index. + In that case, it will be faster than the ``log()`` method""" + return RefLog.entry_at(RefLog.path(self), index) + + @classmethod + def to_full_path(cls, path): + """ + :return: string with a full repository-relative path which can be used to initialize + a Reference instance, for instance by using ``Reference.from_path``""" + if isinstance(path, SymbolicReference): + path = path.path + full_ref_path = path + if not cls._common_path_default: + return full_ref_path + if not path.startswith(cls._common_path_default+"/"): + full_ref_path = '%s/%s' % (cls._common_path_default, path) + return full_ref_path + + @classmethod + def delete(cls, repo, path): + """Delete the reference at the given path + + :param repo: + Repository to delete the reference from + + :param path: + Short or full path pointing to the reference, i.e. refs/myreference + or just "myreference", hence 'refs/' is implied. + Alternatively the symbolic reference to be deleted""" + full_ref_path = cls.to_full_path(path) + abs_path = join(repo.git_dir, full_ref_path) + if exists(abs_path): + os.remove(abs_path) + else: + # check packed refs + pack_file_path = cls._get_packed_refs_path(repo) + try: + reader = open(pack_file_path) + except (OSError,IOError): + pass # it didnt exist at all + else: + new_lines = list() + made_change = False + dropped_last_line = False + for line in reader: + # keep line if it is a comment or if the ref to delete is not + # in the line + # If we deleted the last line and this one is a tag-reference object, + # we drop it as well + if ( line.startswith('#') or full_ref_path not in line ) and \ + ( not dropped_last_line or dropped_last_line and not line.startswith('^') ): + new_lines.append(line) + dropped_last_line = False + continue + # END skip comments and lines without our path + + # drop this line + made_change = True + dropped_last_line = True + # END for each line in packed refs + reader.close() + + # write the new lines + if made_change: + open(pack_file_path, 'w').writelines(new_lines) + # END open exception handling + # END handle deletion + + # delete the reflog + reflog_path = RefLog.path(cls(repo, full_ref_path)) + if os.path.isfile(reflog_path): + os.remove(reflog_path) + #END remove reflog + + + @classmethod + def _create(cls, repo, path, resolve, reference, force, logmsg=None): + """internal method used to create a new symbolic reference. + If resolve is False, the reference will be taken as is, creating + a proper symbolic reference. Otherwise it will be resolved to the + corresponding object and a detached symbolic reference will be created + instead""" + full_ref_path = cls.to_full_path(path) + abs_ref_path = join(repo.git_dir, full_ref_path) + + # figure out target data + target = reference + if resolve: + target = repo.rev_parse(str(reference)) + + if not force and isfile(abs_ref_path): + target_data = str(target) + if isinstance(target, SymbolicReference): + target_data = target.path + if not resolve: + target_data = "ref: " + target_data + existing_data = open(abs_ref_path, 'rb').read().strip() + if existing_data != target_data: + raise OSError("Reference at %r does already exist, pointing to %r, requested was %r" % (full_ref_path, existing_data, target_data)) + # END no force handling + + ref = cls(repo, full_ref_path) + ref.set_reference(target, logmsg) + return ref + + @classmethod + def create(cls, repo, path, reference='HEAD', force=False, logmsg=None): + """Create a new symbolic reference, hence a reference pointing to another reference. + + :param repo: + Repository to create the reference in + + :param path: + full path at which the new symbolic reference is supposed to be + created at, i.e. "NEW_HEAD" or "symrefs/my_new_symref" + + :param reference: + The reference to which the new symbolic reference should point to + + :param force: + if True, force creation even if a symbolic reference with that name already exists. + Raise OSError otherwise + + :param logmsg: + If not None, the message to append to the reflog. Otherwise no reflog + entry is written. + + :return: Newly created symbolic Reference + + :raise OSError: + If a (Symbolic)Reference with the same name but different contents + already exists. + + :note: This does not alter the current HEAD, index or Working Tree""" + return cls._create(repo, path, cls._resolve_ref_on_create, reference, force, logmsg) + + def rename(self, new_path, force=False): + """Rename self to a new path + + :param new_path: + Either a simple name or a full path, i.e. new_name or features/new_name. + The prefix refs/ is implied for references and will be set as needed. + In case this is a symbolic ref, there is no implied prefix + + :param force: + If True, the rename will succeed even if a head with the target name + already exists. It will be overwritten in that case + + :return: self + :raise OSError: In case a file at path but a different contents already exists """ + new_path = self.to_full_path(new_path) + if self.path == new_path: + return self + + new_abs_path = join(self.repo.git_dir, new_path) + cur_abs_path = join(self.repo.git_dir, self.path) + if isfile(new_abs_path): + if not force: + # if they point to the same file, its not an error + if open(new_abs_path,'rb').read().strip() != open(cur_abs_path,'rb').read().strip(): + raise OSError("File at path %r already exists" % new_abs_path) + # else: we could remove ourselves and use the otherone, but + # but clarity we just continue as usual + # END not force handling + os.remove(new_abs_path) + # END handle existing target file + + dname = dirname(new_abs_path) + if not isdir(dname): + os.makedirs(dname) + # END create directory + + rename(cur_abs_path, new_abs_path) + self.path = new_path + + return self + + @classmethod + def _iter_items(cls, repo, common_path = None): + if common_path is None: + common_path = cls._common_path_default + rela_paths = set() + + # walk loose refs + # Currently we do not follow links + for root, dirs, files in os.walk(join_path_native(repo.git_dir, common_path)): + if 'refs/' not in root: # skip non-refs subfolders + refs_id = [ i for i,d in enumerate(dirs) if d == 'refs' ] + if refs_id: + dirs[0:] = ['refs'] + # END prune non-refs folders + + for f in files: + abs_path = to_native_path_linux(join_path(root, f)) + rela_paths.add(abs_path.replace(to_native_path_linux(repo.git_dir) + '/', "")) + # END for each file in root directory + # END for each directory to walk + + # read packed refs + for sha, rela_path in cls._iter_packed_refs(repo): + if rela_path.startswith(common_path): + rela_paths.add(rela_path) + # END relative path matches common path + # END packed refs reading + + # return paths in sorted order + for path in sorted(rela_paths): + try: + yield cls.from_path(repo, path) + except ValueError: + continue + # END for each sorted relative refpath + + @classmethod + def iter_items(cls, repo, common_path = None): + """Find all refs in the repository + + :param repo: is the Repo + + :param common_path: + Optional keyword argument to the path which is to be shared by all + returned Ref objects. + Defaults to class specific portion if None assuring that only + refs suitable for the actual class are returned. + + :return: + git.SymbolicReference[], each of them is guaranteed to be a symbolic + ref which is not detached. + + List is lexigraphically sorted + The returned objects represent actual subclasses, such as Head or TagReference""" + return ( r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached ) + + @classmethod + def from_path(cls, repo, path): + """ + :param path: full .git-directory-relative path name to the Reference to instantiate + :note: use to_full_path() if you only have a partial path of a known Reference Type + :return: + Instance of type Reference, Head, or Tag + depending on the given path""" + if not path: + raise ValueError("Cannot create Reference from %r" % path) + + for ref_type in (HEAD, Head, RemoteReference, TagReference, Reference, SymbolicReference): + try: + instance = ref_type(repo, path) + if instance.__class__ == SymbolicReference and instance.is_detached: + raise ValueError("SymbolRef was detached, we drop it") + return instance + except ValueError: + pass + # END exception handling + # END for each type to try + raise ValueError("Could not find reference type suitable to handle path %r" % path) diff --git a/git/refs/tag.py b/git/refs/tag.py new file mode 100644 index 00000000..c09d814d --- /dev/null +++ b/git/refs/tag.py @@ -0,0 +1,91 @@ +from reference import Reference + +__all__ = ["TagReference", "Tag"] + + + +class TagReference(Reference): + """Class representing a lightweight tag reference which either points to a commit + ,a tag object or any other object. In the latter case additional information, + like the signature or the tag-creator, is available. + + This tag object will always point to a commit object, but may carray additional + information in a tag object:: + + tagref = TagReference.list_items(repo)[0] + print tagref.commit.message + if tagref.tag is not None: + print tagref.tag.message""" + + __slots__ = tuple() + _common_path_default = "refs/tags" + + @property + def commit(self): + """:return: Commit object the tag ref points to""" + obj = self.object + if obj.type == "commit": + return obj + elif obj.type == "tag": + # it is a tag object which carries the commit as an object - we can point to anything + return obj.object + else: + raise ValueError( "Tag %s points to a Blob or Tree - have never seen that before" % self ) + + @property + def tag(self): + """ + :return: Tag object this tag ref points to or None in case + we are a light weight tag""" + obj = self.object + if obj.type == "tag": + return obj + return None + + # make object read-only + # It should be reasonably hard to adjust an existing tag + object = property(Reference._get_object) + + @classmethod + def create(cls, repo, path, ref='HEAD', message=None, force=False, **kwargs): + """Create a new tag reference. + + :param path: + The name of the tag, i.e. 1.0 or releases/1.0. + The prefix refs/tags is implied + + :param ref: + A reference to the object you want to tag. It can be a commit, tree or + blob. + + :param message: + If not None, the message will be used in your tag object. This will also + create an additional tag object that allows to obtain that information, i.e.:: + + tagref.tag.message + + :param force: + If True, to force creation of a tag even though that tag already exists. + + :param kwargs: + Additional keyword arguments to be passed to git-tag + + :return: A new TagReference""" + args = ( path, ref ) + if message: + kwargs['m'] = message + if force: + kwargs['f'] = True + + repo.git.tag(*args, **kwargs) + return TagReference(repo, "%s/%s" % (cls._common_path_default, path)) + + @classmethod + def delete(cls, repo, *tags): + """Delete the given existing tag or tags""" + repo.git.tag("-d", *tags) + + + +# provide an alias +Tag = TagReference |