diff options
Diffstat (limited to 'refs/symbolic.py')
-rw-r--r-- | refs/symbolic.py | 512 |
1 files changed, 512 insertions, 0 deletions
diff --git a/refs/symbolic.py b/refs/symbolic.py new file mode 100644 index 00000000..3329d51c --- /dev/null +++ b/refs/symbolic.py @@ -0,0 +1,512 @@ +import os +from git.objects import Commit +from git.util import ( + join_path, + join_path_native, + to_native_path_linux + ) + +from gitdb.util import ( + join, + dirname, + isdir, + exists, + isfile, + rename, + hex_to_bin + ) + +__all__ = ["SymbolicReference"] + +class SymbolicReference(object): + """Represents a special case of a reference such that this reference is symbolic. + It does not point to a specific commit, but to another Head, which itself + specifies a commit. + + A typical example for a symbolic reference is HEAD.""" + __slots__ = ("repo", "path") + _common_path_default = "" + _id_attribute_ = "name" + + def __init__(self, repo, path): + self.repo = repo + self.path = path + + def __str__(self): + return self.path + + def __repr__(self): + return '<git.%s "%s">' % (self.__class__.__name__, self.path) + + def __eq__(self, other): + return self.path == other.path + + def __ne__(self, other): + return not ( self == other ) + + def __hash__(self): + return hash(self.path) + + @property + def name(self): + """ + :return: + In case of symbolic references, the shortest assumable name + is the path itself.""" + return self.path + + def _abs_path(self): + return join_path_native(self.repo.git_dir, self.path) + + @classmethod + def _get_packed_refs_path(cls, repo): + return join(repo.git_dir, 'packed-refs') + + @classmethod + def _iter_packed_refs(cls, repo): + """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs. + :note: The packed refs file will be kept open as long as we iterate""" + try: + fp = open(cls._get_packed_refs_path(repo), 'r') + for line in fp: + line = line.strip() + if not line: + continue + if line.startswith('#'): + if line.startswith('# pack-refs with:') and not line.endswith('peeled'): + raise TypeError("PackingType of packed-Refs not understood: %r" % line) + # END abort if we do not understand the packing scheme + continue + # END parse comment + + # skip dereferenced tag object entries - previous line was actual + # tag reference for it + if line[0] == '^': + continue + + yield tuple(line.split(' ', 1)) + # END for each line + except (OSError,IOError): + raise StopIteration + # END no packed-refs file handling + # NOTE: Had try-finally block around here to close the fp, + # but some python version woudn't allow yields within that. + # I believe files are closing themselves on destruction, so it is + # alright. + + @classmethod + def dereference_recursive(cls, repo, ref_path): + """ + :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all + intermediate references as required + :param repo: the repository containing the reference at ref_path""" + while True: + ref = cls(repo, ref_path) + hexsha, ref_path = ref._get_ref_info() + if hexsha is not None: + return hexsha + # END recursive dereferencing + + def _get_ref_info(self): + """Return: (sha, target_ref_path) if available, the sha the file at + rela_path points to, or None. target_ref_path is the reference we + point to, or None""" + tokens = None + try: + fp = open(self._abs_path(), 'r') + value = fp.read().rstrip() + fp.close() + tokens = value.split(" ") + except (OSError,IOError): + # Probably we are just packed, find our entry in the packed refs file + # NOTE: We are not a symbolic ref if we are in a packed file, as these + # are excluded explictly + for sha, path in self._iter_packed_refs(self.repo): + if path != self.path: continue + tokens = (sha, path) + break + # END for each packed ref + # END handle packed refs + + if tokens is None: + raise ValueError("Reference at %r does not exist" % self.path) + + # is it a reference ? + if tokens[0] == 'ref:': + return (None, tokens[1]) + + # its a commit + if self.repo.re_hexsha_only.match(tokens[0]): + return (tokens[0], None) + + raise ValueError("Failed to parse reference information from %r" % self.path) + + def _get_commit(self): + """ + :return: + Commit object we point to, works for detached and non-detached + SymbolicReferences""" + # we partially reimplement it to prevent unnecessary file access + hexsha, target_ref_path = self._get_ref_info() + + # it is a detached reference + if hexsha: + return Commit(self.repo, hex_to_bin(hexsha)) + + return self.from_path(self.repo, target_ref_path).commit + + def _set_commit(self, commit): + """Set our commit, possibly dereference our symbolic reference first. + If the reference does not exist, it will be created""" + is_detached = True + try: + is_detached = self.is_detached + except ValueError: + pass + # END handle non-existing ones + if is_detached: + return self._set_reference(commit) + + # set the commit on our reference + self._get_reference().commit = commit + + commit = property(_get_commit, _set_commit, doc="Query or set commits directly") + + def _get_reference(self): + """:return: Reference Object we point to""" + sha, target_ref_path = self._get_ref_info() + if target_ref_path is None: + raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha)) + return self.from_path(self.repo, target_ref_path) + + def _set_reference(self, ref): + """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference. + Otherwise we try to get a commit from it using our interface. + + Strings are allowed but will be checked to be sure we have a commit""" + write_value = None + if isinstance(ref, SymbolicReference): + write_value = "ref: %s" % ref.path + elif isinstance(ref, Commit): + write_value = ref.hexsha + else: + try: + write_value = ref.commit.hexsha + except AttributeError: + try: + obj = self.repo.rev_parse(ref+"^{}") # optionally deref tags + if obj.type != "commit": + raise TypeError("Invalid object type behind sha: %s" % sha) + write_value = obj.hexsha + except Exception: + raise ValueError("Could not extract object from %s" % ref) + # END end try string + # END try commit attribute + + # maintain the orig-head if we are currently checked-out + head = HEAD(self.repo) + try: + if head.ref == self: + try: + # TODO: implement this atomically, if we fail below, orig_head is at an incorrect spot + # Enforce the creation of ORIG_HEAD + SymbolicReference.create(self.repo, head.orig_head().name, self.commit, force=True) + except ValueError: + pass + #END exception handling + # END if we are checked-out + except TypeError: + pass + # END handle detached heads + + # if we are writing a ref, use symbolic ref to get the reflog and more + # checking + # Otherwise we detach it and have to do it manually. Besides, this works + # recursively automaitcally, but should be replaced with a python implementation + # soon + if write_value.startswith('ref:'): + self.repo.git.symbolic_ref(self.path, write_value[5:]) + return + # END non-detached handling + + path = self._abs_path() + directory = dirname(path) + if not isdir(directory): + os.makedirs(directory) + + fp = open(path, "wb") + try: + fp.write(write_value) + finally: + fp.close() + # END writing + + + # aliased reference + reference = property(_get_reference, _set_reference, doc="Returns the Reference we point to") + ref = reference + + def is_valid(self): + """ + :return: + True if the reference is valid, hence it can be read and points to + a valid object or reference.""" + try: + self.commit + except (OSError, ValueError): + return False + else: + return True + + @property + def is_detached(self): + """ + :return: + True if we are a detached reference, hence we point to a specific commit + instead to another reference""" + try: + self.reference + return False + except TypeError: + return True + + + @classmethod + def to_full_path(cls, path): + """ + :return: string with a full repository-relative path which can be used to initialize + a Reference instance, for instance by using ``Reference.from_path``""" + if isinstance(path, SymbolicReference): + path = path.path + full_ref_path = path + if not cls._common_path_default: + return full_ref_path + if not path.startswith(cls._common_path_default+"/"): + full_ref_path = '%s/%s' % (cls._common_path_default, path) + return full_ref_path + + @classmethod + def delete(cls, repo, path): + """Delete the reference at the given path + + :param repo: + Repository to delete the reference from + + :param path: + Short or full path pointing to the reference, i.e. refs/myreference + or just "myreference", hence 'refs/' is implied. + Alternatively the symbolic reference to be deleted""" + full_ref_path = cls.to_full_path(path) + abs_path = join(repo.git_dir, full_ref_path) + if exists(abs_path): + os.remove(abs_path) + else: + # check packed refs + pack_file_path = cls._get_packed_refs_path(repo) + try: + reader = open(pack_file_path) + except (OSError,IOError): + pass # it didnt exist at all + else: + new_lines = list() + made_change = False + dropped_last_line = False + for line in reader: + # keep line if it is a comment or if the ref to delete is not + # in the line + # If we deleted the last line and this one is a tag-reference object, + # we drop it as well + if ( line.startswith('#') or full_ref_path not in line ) and \ + ( not dropped_last_line or dropped_last_line and not line.startswith('^') ): + new_lines.append(line) + dropped_last_line = False + continue + # END skip comments and lines without our path + + # drop this line + made_change = True + dropped_last_line = True + # END for each line in packed refs + reader.close() + + # write the new lines + if made_change: + open(pack_file_path, 'w').writelines(new_lines) + # END open exception handling + # END handle deletion + + @classmethod + def _create(cls, repo, path, resolve, reference, force): + """internal method used to create a new symbolic reference. + If resolve is False,, the reference will be taken as is, creating + a proper symbolic reference. Otherwise it will be resolved to the + corresponding object and a detached symbolic reference will be created + instead""" + full_ref_path = cls.to_full_path(path) + abs_ref_path = join(repo.git_dir, full_ref_path) + + # figure out target data + target = reference + if resolve: + target = repo.rev_parse(str(reference)) + + if not force and isfile(abs_ref_path): + target_data = str(target) + if isinstance(target, SymbolicReference): + target_data = target.path + if not resolve: + target_data = "ref: " + target_data + if open(abs_ref_path, 'rb').read().strip() != target_data: + raise OSError("Reference at %s does already exist" % full_ref_path) + # END no force handling + + ref = cls(repo, full_ref_path) + ref.reference = target + return ref + + @classmethod + def create(cls, repo, path, reference='HEAD', force=False ): + """Create a new symbolic reference, hence a reference pointing to another reference. + + :param repo: + Repository to create the reference in + + :param path: + full path at which the new symbolic reference is supposed to be + created at, i.e. "NEW_HEAD" or "symrefs/my_new_symref" + + :param reference: + The reference to which the new symbolic reference should point to + + :param force: + if True, force creation even if a symbolic reference with that name already exists. + Raise OSError otherwise + + :return: Newly created symbolic Reference + + :raise OSError: + If a (Symbolic)Reference with the same name but different contents + already exists. + + :note: This does not alter the current HEAD, index or Working Tree""" + return cls._create(repo, path, False, reference, force) + + def rename(self, new_path, force=False): + """Rename self to a new path + + :param new_path: + Either a simple name or a full path, i.e. new_name or features/new_name. + The prefix refs/ is implied for references and will be set as needed. + In case this is a symbolic ref, there is no implied prefix + + :param force: + If True, the rename will succeed even if a head with the target name + already exists. It will be overwritten in that case + + :return: self + :raise OSError: In case a file at path but a different contents already exists """ + new_path = self.to_full_path(new_path) + if self.path == new_path: + return self + + new_abs_path = join(self.repo.git_dir, new_path) + cur_abs_path = join(self.repo.git_dir, self.path) + if isfile(new_abs_path): + if not force: + # if they point to the same file, its not an error + if open(new_abs_path,'rb').read().strip() != open(cur_abs_path,'rb').read().strip(): + raise OSError("File at path %r already exists" % new_abs_path) + # else: we could remove ourselves and use the otherone, but + # but clarity we just continue as usual + # END not force handling + os.remove(new_abs_path) + # END handle existing target file + + dname = dirname(new_abs_path) + if not isdir(dname): + os.makedirs(dname) + # END create directory + + rename(cur_abs_path, new_abs_path) + self.path = new_path + + return self + + @classmethod + def _iter_items(cls, repo, common_path = None): + if common_path is None: + common_path = cls._common_path_default + rela_paths = set() + + # walk loose refs + # Currently we do not follow links + for root, dirs, files in os.walk(join_path_native(repo.git_dir, common_path)): + if 'refs/' not in root: # skip non-refs subfolders + refs_id = [ i for i,d in enumerate(dirs) if d == 'refs' ] + if refs_id: + dirs[0:] = ['refs'] + # END prune non-refs folders + + for f in files: + abs_path = to_native_path_linux(join_path(root, f)) + rela_paths.add(abs_path.replace(to_native_path_linux(repo.git_dir) + '/', "")) + # END for each file in root directory + # END for each directory to walk + + # read packed refs + for sha, rela_path in cls._iter_packed_refs(repo): + if rela_path.startswith(common_path): + rela_paths.add(rela_path) + # END relative path matches common path + # END packed refs reading + + # return paths in sorted order + for path in sorted(rela_paths): + try: + yield cls.from_path(repo, path) + except ValueError: + continue + # END for each sorted relative refpath + + @classmethod + def iter_items(cls, repo, common_path = None): + """Find all refs in the repository + + :param repo: is the Repo + + :param common_path: + Optional keyword argument to the path which is to be shared by all + returned Ref objects. + Defaults to class specific portion if None assuring that only + refs suitable for the actual class are returned. + + :return: + git.SymbolicReference[], each of them is guaranteed to be a symbolic + ref which is not detached. + + List is lexigraphically sorted + The returned objects represent actual subclasses, such as Head or TagReference""" + return ( r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached ) + + @classmethod + def from_path(cls, repo, path): + """ + :param path: full .git-directory-relative path name to the Reference to instantiate + :note: use to_full_path() if you only have a partial path of a known Reference Type + :return: + Instance of type Reference, Head, or Tag + depending on the given path""" + if not path: + raise ValueError("Cannot create Reference from %r" % path) + + for ref_type in (HEAD, Head, RemoteReference, TagReference, Reference, SymbolicReference): + try: + instance = ref_type(repo, path) + if instance.__class__ == SymbolicReference and instance.is_detached: + raise ValueError("SymbolRef was detached, we drop it") + return instance + except ValueError: + pass + # END exception handling + # END for each type to try + raise ValueError("Could not find reference type suitable to handle path %r" % path) |