summaryrefslogtreecommitdiff
path: root/refs/symbolic.py
diff options
context:
space:
mode:
Diffstat (limited to 'refs/symbolic.py')
-rw-r--r--refs/symbolic.py512
1 files changed, 512 insertions, 0 deletions
diff --git a/refs/symbolic.py b/refs/symbolic.py
new file mode 100644
index 00000000..3329d51c
--- /dev/null
+++ b/refs/symbolic.py
@@ -0,0 +1,512 @@
+import os
+from git.objects import Commit
+from git.util import (
+ join_path,
+ join_path_native,
+ to_native_path_linux
+ )
+
+from gitdb.util import (
+ join,
+ dirname,
+ isdir,
+ exists,
+ isfile,
+ rename,
+ hex_to_bin
+ )
+
+__all__ = ["SymbolicReference"]
+
+class SymbolicReference(object):
+ """Represents a special case of a reference such that this reference is symbolic.
+ It does not point to a specific commit, but to another Head, which itself
+ specifies a commit.
+
+ A typical example for a symbolic reference is HEAD."""
+ __slots__ = ("repo", "path")
+ _common_path_default = ""
+ _id_attribute_ = "name"
+
+ def __init__(self, repo, path):
+ self.repo = repo
+ self.path = path
+
+ def __str__(self):
+ return self.path
+
+ def __repr__(self):
+ return '<git.%s "%s">' % (self.__class__.__name__, self.path)
+
+ def __eq__(self, other):
+ return self.path == other.path
+
+ def __ne__(self, other):
+ return not ( self == other )
+
+ def __hash__(self):
+ return hash(self.path)
+
+ @property
+ def name(self):
+ """
+ :return:
+ In case of symbolic references, the shortest assumable name
+ is the path itself."""
+ return self.path
+
+ def _abs_path(self):
+ return join_path_native(self.repo.git_dir, self.path)
+
+ @classmethod
+ def _get_packed_refs_path(cls, repo):
+ return join(repo.git_dir, 'packed-refs')
+
+ @classmethod
+ def _iter_packed_refs(cls, repo):
+ """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs.
+ :note: The packed refs file will be kept open as long as we iterate"""
+ try:
+ fp = open(cls._get_packed_refs_path(repo), 'r')
+ for line in fp:
+ line = line.strip()
+ if not line:
+ continue
+ if line.startswith('#'):
+ if line.startswith('# pack-refs with:') and not line.endswith('peeled'):
+ raise TypeError("PackingType of packed-Refs not understood: %r" % line)
+ # END abort if we do not understand the packing scheme
+ continue
+ # END parse comment
+
+ # skip dereferenced tag object entries - previous line was actual
+ # tag reference for it
+ if line[0] == '^':
+ continue
+
+ yield tuple(line.split(' ', 1))
+ # END for each line
+ except (OSError,IOError):
+ raise StopIteration
+ # END no packed-refs file handling
+ # NOTE: Had try-finally block around here to close the fp,
+ # but some python version woudn't allow yields within that.
+ # I believe files are closing themselves on destruction, so it is
+ # alright.
+
+ @classmethod
+ def dereference_recursive(cls, repo, ref_path):
+ """
+ :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all
+ intermediate references as required
+ :param repo: the repository containing the reference at ref_path"""
+ while True:
+ ref = cls(repo, ref_path)
+ hexsha, ref_path = ref._get_ref_info()
+ if hexsha is not None:
+ return hexsha
+ # END recursive dereferencing
+
+ def _get_ref_info(self):
+ """Return: (sha, target_ref_path) if available, the sha the file at
+ rela_path points to, or None. target_ref_path is the reference we
+ point to, or None"""
+ tokens = None
+ try:
+ fp = open(self._abs_path(), 'r')
+ value = fp.read().rstrip()
+ fp.close()
+ tokens = value.split(" ")
+ except (OSError,IOError):
+ # Probably we are just packed, find our entry in the packed refs file
+ # NOTE: We are not a symbolic ref if we are in a packed file, as these
+ # are excluded explictly
+ for sha, path in self._iter_packed_refs(self.repo):
+ if path != self.path: continue
+ tokens = (sha, path)
+ break
+ # END for each packed ref
+ # END handle packed refs
+
+ if tokens is None:
+ raise ValueError("Reference at %r does not exist" % self.path)
+
+ # is it a reference ?
+ if tokens[0] == 'ref:':
+ return (None, tokens[1])
+
+ # its a commit
+ if self.repo.re_hexsha_only.match(tokens[0]):
+ return (tokens[0], None)
+
+ raise ValueError("Failed to parse reference information from %r" % self.path)
+
+ def _get_commit(self):
+ """
+ :return:
+ Commit object we point to, works for detached and non-detached
+ SymbolicReferences"""
+ # we partially reimplement it to prevent unnecessary file access
+ hexsha, target_ref_path = self._get_ref_info()
+
+ # it is a detached reference
+ if hexsha:
+ return Commit(self.repo, hex_to_bin(hexsha))
+
+ return self.from_path(self.repo, target_ref_path).commit
+
+ def _set_commit(self, commit):
+ """Set our commit, possibly dereference our symbolic reference first.
+ If the reference does not exist, it will be created"""
+ is_detached = True
+ try:
+ is_detached = self.is_detached
+ except ValueError:
+ pass
+ # END handle non-existing ones
+ if is_detached:
+ return self._set_reference(commit)
+
+ # set the commit on our reference
+ self._get_reference().commit = commit
+
+ commit = property(_get_commit, _set_commit, doc="Query or set commits directly")
+
+ def _get_reference(self):
+ """:return: Reference Object we point to"""
+ sha, target_ref_path = self._get_ref_info()
+ if target_ref_path is None:
+ raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha))
+ return self.from_path(self.repo, target_ref_path)
+
+ def _set_reference(self, ref):
+ """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference.
+ Otherwise we try to get a commit from it using our interface.
+
+ Strings are allowed but will be checked to be sure we have a commit"""
+ write_value = None
+ if isinstance(ref, SymbolicReference):
+ write_value = "ref: %s" % ref.path
+ elif isinstance(ref, Commit):
+ write_value = ref.hexsha
+ else:
+ try:
+ write_value = ref.commit.hexsha
+ except AttributeError:
+ try:
+ obj = self.repo.rev_parse(ref+"^{}") # optionally deref tags
+ if obj.type != "commit":
+ raise TypeError("Invalid object type behind sha: %s" % sha)
+ write_value = obj.hexsha
+ except Exception:
+ raise ValueError("Could not extract object from %s" % ref)
+ # END end try string
+ # END try commit attribute
+
+ # maintain the orig-head if we are currently checked-out
+ head = HEAD(self.repo)
+ try:
+ if head.ref == self:
+ try:
+ # TODO: implement this atomically, if we fail below, orig_head is at an incorrect spot
+ # Enforce the creation of ORIG_HEAD
+ SymbolicReference.create(self.repo, head.orig_head().name, self.commit, force=True)
+ except ValueError:
+ pass
+ #END exception handling
+ # END if we are checked-out
+ except TypeError:
+ pass
+ # END handle detached heads
+
+ # if we are writing a ref, use symbolic ref to get the reflog and more
+ # checking
+ # Otherwise we detach it and have to do it manually. Besides, this works
+ # recursively automaitcally, but should be replaced with a python implementation
+ # soon
+ if write_value.startswith('ref:'):
+ self.repo.git.symbolic_ref(self.path, write_value[5:])
+ return
+ # END non-detached handling
+
+ path = self._abs_path()
+ directory = dirname(path)
+ if not isdir(directory):
+ os.makedirs(directory)
+
+ fp = open(path, "wb")
+ try:
+ fp.write(write_value)
+ finally:
+ fp.close()
+ # END writing
+
+
+ # aliased reference
+ reference = property(_get_reference, _set_reference, doc="Returns the Reference we point to")
+ ref = reference
+
+ def is_valid(self):
+ """
+ :return:
+ True if the reference is valid, hence it can be read and points to
+ a valid object or reference."""
+ try:
+ self.commit
+ except (OSError, ValueError):
+ return False
+ else:
+ return True
+
+ @property
+ def is_detached(self):
+ """
+ :return:
+ True if we are a detached reference, hence we point to a specific commit
+ instead to another reference"""
+ try:
+ self.reference
+ return False
+ except TypeError:
+ return True
+
+
+ @classmethod
+ def to_full_path(cls, path):
+ """
+ :return: string with a full repository-relative path which can be used to initialize
+ a Reference instance, for instance by using ``Reference.from_path``"""
+ if isinstance(path, SymbolicReference):
+ path = path.path
+ full_ref_path = path
+ if not cls._common_path_default:
+ return full_ref_path
+ if not path.startswith(cls._common_path_default+"/"):
+ full_ref_path = '%s/%s' % (cls._common_path_default, path)
+ return full_ref_path
+
+ @classmethod
+ def delete(cls, repo, path):
+ """Delete the reference at the given path
+
+ :param repo:
+ Repository to delete the reference from
+
+ :param path:
+ Short or full path pointing to the reference, i.e. refs/myreference
+ or just "myreference", hence 'refs/' is implied.
+ Alternatively the symbolic reference to be deleted"""
+ full_ref_path = cls.to_full_path(path)
+ abs_path = join(repo.git_dir, full_ref_path)
+ if exists(abs_path):
+ os.remove(abs_path)
+ else:
+ # check packed refs
+ pack_file_path = cls._get_packed_refs_path(repo)
+ try:
+ reader = open(pack_file_path)
+ except (OSError,IOError):
+ pass # it didnt exist at all
+ else:
+ new_lines = list()
+ made_change = False
+ dropped_last_line = False
+ for line in reader:
+ # keep line if it is a comment or if the ref to delete is not
+ # in the line
+ # If we deleted the last line and this one is a tag-reference object,
+ # we drop it as well
+ if ( line.startswith('#') or full_ref_path not in line ) and \
+ ( not dropped_last_line or dropped_last_line and not line.startswith('^') ):
+ new_lines.append(line)
+ dropped_last_line = False
+ continue
+ # END skip comments and lines without our path
+
+ # drop this line
+ made_change = True
+ dropped_last_line = True
+ # END for each line in packed refs
+ reader.close()
+
+ # write the new lines
+ if made_change:
+ open(pack_file_path, 'w').writelines(new_lines)
+ # END open exception handling
+ # END handle deletion
+
+ @classmethod
+ def _create(cls, repo, path, resolve, reference, force):
+ """internal method used to create a new symbolic reference.
+ If resolve is False,, the reference will be taken as is, creating
+ a proper symbolic reference. Otherwise it will be resolved to the
+ corresponding object and a detached symbolic reference will be created
+ instead"""
+ full_ref_path = cls.to_full_path(path)
+ abs_ref_path = join(repo.git_dir, full_ref_path)
+
+ # figure out target data
+ target = reference
+ if resolve:
+ target = repo.rev_parse(str(reference))
+
+ if not force and isfile(abs_ref_path):
+ target_data = str(target)
+ if isinstance(target, SymbolicReference):
+ target_data = target.path
+ if not resolve:
+ target_data = "ref: " + target_data
+ if open(abs_ref_path, 'rb').read().strip() != target_data:
+ raise OSError("Reference at %s does already exist" % full_ref_path)
+ # END no force handling
+
+ ref = cls(repo, full_ref_path)
+ ref.reference = target
+ return ref
+
+ @classmethod
+ def create(cls, repo, path, reference='HEAD', force=False ):
+ """Create a new symbolic reference, hence a reference pointing to another reference.
+
+ :param repo:
+ Repository to create the reference in
+
+ :param path:
+ full path at which the new symbolic reference is supposed to be
+ created at, i.e. "NEW_HEAD" or "symrefs/my_new_symref"
+
+ :param reference:
+ The reference to which the new symbolic reference should point to
+
+ :param force:
+ if True, force creation even if a symbolic reference with that name already exists.
+ Raise OSError otherwise
+
+ :return: Newly created symbolic Reference
+
+ :raise OSError:
+ If a (Symbolic)Reference with the same name but different contents
+ already exists.
+
+ :note: This does not alter the current HEAD, index or Working Tree"""
+ return cls._create(repo, path, False, reference, force)
+
+ def rename(self, new_path, force=False):
+ """Rename self to a new path
+
+ :param new_path:
+ Either a simple name or a full path, i.e. new_name or features/new_name.
+ The prefix refs/ is implied for references and will be set as needed.
+ In case this is a symbolic ref, there is no implied prefix
+
+ :param force:
+ If True, the rename will succeed even if a head with the target name
+ already exists. It will be overwritten in that case
+
+ :return: self
+ :raise OSError: In case a file at path but a different contents already exists """
+ new_path = self.to_full_path(new_path)
+ if self.path == new_path:
+ return self
+
+ new_abs_path = join(self.repo.git_dir, new_path)
+ cur_abs_path = join(self.repo.git_dir, self.path)
+ if isfile(new_abs_path):
+ if not force:
+ # if they point to the same file, its not an error
+ if open(new_abs_path,'rb').read().strip() != open(cur_abs_path,'rb').read().strip():
+ raise OSError("File at path %r already exists" % new_abs_path)
+ # else: we could remove ourselves and use the otherone, but
+ # but clarity we just continue as usual
+ # END not force handling
+ os.remove(new_abs_path)
+ # END handle existing target file
+
+ dname = dirname(new_abs_path)
+ if not isdir(dname):
+ os.makedirs(dname)
+ # END create directory
+
+ rename(cur_abs_path, new_abs_path)
+ self.path = new_path
+
+ return self
+
+ @classmethod
+ def _iter_items(cls, repo, common_path = None):
+ if common_path is None:
+ common_path = cls._common_path_default
+ rela_paths = set()
+
+ # walk loose refs
+ # Currently we do not follow links
+ for root, dirs, files in os.walk(join_path_native(repo.git_dir, common_path)):
+ if 'refs/' not in root: # skip non-refs subfolders
+ refs_id = [ i for i,d in enumerate(dirs) if d == 'refs' ]
+ if refs_id:
+ dirs[0:] = ['refs']
+ # END prune non-refs folders
+
+ for f in files:
+ abs_path = to_native_path_linux(join_path(root, f))
+ rela_paths.add(abs_path.replace(to_native_path_linux(repo.git_dir) + '/', ""))
+ # END for each file in root directory
+ # END for each directory to walk
+
+ # read packed refs
+ for sha, rela_path in cls._iter_packed_refs(repo):
+ if rela_path.startswith(common_path):
+ rela_paths.add(rela_path)
+ # END relative path matches common path
+ # END packed refs reading
+
+ # return paths in sorted order
+ for path in sorted(rela_paths):
+ try:
+ yield cls.from_path(repo, path)
+ except ValueError:
+ continue
+ # END for each sorted relative refpath
+
+ @classmethod
+ def iter_items(cls, repo, common_path = None):
+ """Find all refs in the repository
+
+ :param repo: is the Repo
+
+ :param common_path:
+ Optional keyword argument to the path which is to be shared by all
+ returned Ref objects.
+ Defaults to class specific portion if None assuring that only
+ refs suitable for the actual class are returned.
+
+ :return:
+ git.SymbolicReference[], each of them is guaranteed to be a symbolic
+ ref which is not detached.
+
+ List is lexigraphically sorted
+ The returned objects represent actual subclasses, such as Head or TagReference"""
+ return ( r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached )
+
+ @classmethod
+ def from_path(cls, repo, path):
+ """
+ :param path: full .git-directory-relative path name to the Reference to instantiate
+ :note: use to_full_path() if you only have a partial path of a known Reference Type
+ :return:
+ Instance of type Reference, Head, or Tag
+ depending on the given path"""
+ if not path:
+ raise ValueError("Cannot create Reference from %r" % path)
+
+ for ref_type in (HEAD, Head, RemoteReference, TagReference, Reference, SymbolicReference):
+ try:
+ instance = ref_type(repo, path)
+ if instance.__class__ == SymbolicReference and instance.is_detached:
+ raise ValueError("SymbolRef was detached, we drop it")
+ return instance
+ except ValueError:
+ pass
+ # END exception handling
+ # END for each type to try
+ raise ValueError("Could not find reference type suitable to handle path %r" % path)