summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2011-07-04 23:26:13 +0200
committerSebastian Thiel <byronimo@gmail.com>2011-07-04 23:26:13 +0200
commit1ddf05a78475a194ed1aa082d26b3d27ecc77475 (patch)
treeb5727ac49ff7a8ac2a9560cf85de60d60ad10e8f
parente9bd04844725661c8aa2aef11091f01eeab69486 (diff)
parenta92ab8028c7780db728d6aad40ea1f511945fc8e (diff)
downloadgitpython-1ddf05a78475a194ed1aa082d26b3d27ecc77475.tar.gz
Merge branch 'remote-fixes' into 0.3
-rw-r--r--.gitmodules6
m---------git/ext/gitdb0
-rw-r--r--git/refs/reference.py51
-rw-r--r--git/refs/remote.py20
-rw-r--r--git/refs/symbolic.py1
-rw-r--r--git/remote.py30
-rw-r--r--git/test/test_refs.py5
-rw-r--r--git/test/test_remote.py900
8 files changed, 561 insertions, 452 deletions
diff --git a/.gitmodules b/.gitmodules
index c1e1e76d..5741d992 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,3 @@
-[submodule "gitdb"]
- path = git/ext/gitdb
- url = git://github.com/gitpython-developers/gitdb.git
+[submodule "gitdb"]
+ path = git/ext/gitdb
+ url = git://github.com/gitpython-developers/gitdb.git
diff --git a/git/ext/gitdb b/git/ext/gitdb
-Subproject 4524faf0d0c5383268b134084954b34faeaa766
+Subproject a5ed410aa0d3bed587214c3c017af2916b740da
diff --git a/git/refs/reference.py b/git/refs/reference.py
index 1a745ee9..29d051a6 100644
--- a/git/refs/reference.py
+++ b/git/refs/reference.py
@@ -13,6 +13,18 @@ from gitdb.util import (
__all__ = ["Reference"]
+#{ Utilities
+def require_remote_ref_path(func):
+ """A decorator raising a TypeError if we are not a valid remote, based on the path"""
+ def wrapper(self, *args):
+ if not self.path.startswith(self._remote_common_path_default + "/"):
+ raise ValueError("ref path does not point to a remote reference: %s" % path)
+ return func(self, *args)
+ #END wrapper
+ wrapper.__name__ = func.__name__
+ return wrapper
+#}END utilites
+
class Reference(SymbolicReference, LazyMixin, Iterable):
"""Represents a named reference to any object. Subclasses may apply restrictions though,
@@ -22,20 +34,24 @@ class Reference(SymbolicReference, LazyMixin, Iterable):
_resolve_ref_on_create = True
_common_path_default = "refs"
- def __init__(self, repo, path):
+ def __init__(self, repo, path, check_path = True):
"""Initialize this instance
:param repo: Our parent repository
:param path:
Path relative to the .git/ directory pointing to the ref in question, i.e.
- refs/heads/master"""
- if not path.startswith(self._common_path_default+'/'):
- raise ValueError("Cannot instantiate %r from path %s" % ( self.__class__.__name__, path ))
+ refs/heads/master
+ :param check_path: if False, you can provide any path. Otherwise the path must start with the
+ default path prefix of this type."""
+ if check_path and not path.startswith(self._common_path_default+'/'):
+ raise ValueError("Cannot instantiate %r from path %s" % (self.__class__.__name__, path))
super(Reference, self).__init__(repo, path)
def __str__(self):
return self.name
+
+ #{ Interface
def set_object(self, object, logmsg = None):
"""Special version which checks if the head-log needs an update as well"""
@@ -82,3 +98,30 @@ class Reference(SymbolicReference, LazyMixin, Iterable):
"""Equivalent to SymbolicReference.iter_items, but will return non-detached
references as well."""
return cls._iter_items(repo, common_path)
+
+ #}END interface
+
+
+ #{ Remote Interface
+
+ @property
+ @require_remote_ref_path
+ def remote_name(self):
+ """
+ :return:
+ Name of the remote we are a reference of, such as 'origin' for a reference
+ named 'origin/master'"""
+ tokens = self.path.split('/')
+ # /refs/remotes/<remote name>/<branch_name>
+ return tokens[2]
+
+ @property
+ @require_remote_ref_path
+ def remote_head(self):
+ """:return: Name of the remote head itself, i.e. master.
+ :note: The returned name is usually not qualified enough to uniquely identify
+ a branch"""
+ tokens = self.path.split('/')
+ return '/'.join(tokens[3:])
+
+ #} END remote interface
diff --git a/git/refs/remote.py b/git/refs/remote.py
index b7b07d4b..1d45d23f 100644
--- a/git/refs/remote.py
+++ b/git/refs/remote.py
@@ -10,7 +10,7 @@ __all__ = ["RemoteReference"]
class RemoteReference(Head):
"""Represents a reference pointing to a remote head."""
- _common_path_default = "refs/remotes"
+ _common_path_default = Head._remote_common_path_default
@classmethod
@@ -22,24 +22,6 @@ class RemoteReference(Head):
# END handle remote constraint
return super(RemoteReference, cls).iter_items(repo, common_path)
- @property
- def remote_name(self):
- """
- :return:
- Name of the remote we are a reference of, such as 'origin' for a reference
- named 'origin/master'"""
- tokens = self.path.split('/')
- # /refs/remotes/<remote name>/<branch_name>
- return tokens[2]
-
- @property
- def remote_head(self):
- """:return: Name of the remote head itself, i.e. master.
- :note: The returned name is usually not qualified enough to uniquely identify
- a branch"""
- tokens = self.path.split('/')
- return '/'.join(tokens[3:])
-
@classmethod
def delete(cls, repo, *refs, **kwargs):
"""Delete the given remote references.
diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py
index 65327a2f..8556a65e 100644
--- a/git/refs/symbolic.py
+++ b/git/refs/symbolic.py
@@ -33,6 +33,7 @@ class SymbolicReference(object):
_resolve_ref_on_create = False
_points_to_commits_only = True
_common_path_default = ""
+ _remote_common_path_default = "refs/remotes"
_id_attribute_ = "name"
def __init__(self, repo, path):
diff --git a/git/remote.py b/git/remote.py
index a3b91113..5e4439fb 100644
--- a/git/remote.py
+++ b/git/remote.py
@@ -271,18 +271,44 @@ class FetchInfo(object):
ref_type = None
if remote_local_ref == "FETCH_HEAD":
ref_type = SymbolicReference
- elif ref_type_name == "branch":
+ elif ref_type_name in ("remote-tracking", "branch"):
+ # note: remote-tracking is just the first part of the 'remote-tracking branch' token.
+ # We don't parse it correctly, but its enough to know what to do, and its new in git 1.7something
ref_type = RemoteReference
elif ref_type_name == "tag":
ref_type = TagReference
else:
raise TypeError("Cannot handle reference type: %r" % ref_type_name)
+ #END handle ref type
# create ref instance
if ref_type is SymbolicReference:
remote_local_ref = ref_type(repo, "FETCH_HEAD")
else:
- remote_local_ref = Reference.from_path(repo, join_path(ref_type._common_path_default, remote_local_ref.strip()))
+ # determine prefix. Tags are usually pulled into refs/tags, they may have subdirectories.
+ # It is not clear sometimes where exactly the item is, unless we have an absolute path as indicated
+ # by the 'ref/' prefix. Otherwise even a tag could be in refs/remotes, which is when it will have the
+ # 'tags/' subdirectory in its path.
+ # We don't want to test for actual existence, but try to figure everything out analytically.
+ ref_path = None
+ remote_local_ref = remote_local_ref.strip()
+ if remote_local_ref.startswith(Reference._common_path_default + "/"):
+ # always use actual type if we get absolute paths
+ # Will always be the case if something is fetched outside of refs/remotes (if its not a tag)
+ ref_path = remote_local_ref
+ if ref_type is not TagReference and not remote_local_ref.startswith(RemoteReference._common_path_default + "/"):
+ ref_type = Reference
+ #END downgrade remote reference
+ elif ref_type is TagReference and 'tags/' in remote_local_ref:
+ # even though its a tag, it is located in refs/remotes
+ ref_path = join_path(RemoteReference._common_path_default, remote_local_ref)
+ else:
+ ref_path = join_path(ref_type._common_path_default, remote_local_ref)
+ #END obtain refpath
+
+ # even though the path could be within the git conventions, we make
+ # sure we respect whatever the user wanted, and disabled path checking
+ remote_local_ref = ref_type(repo, ref_path, check_path=False)
# END create ref instance
note = ( note and note.strip() ) or ''
diff --git a/git/test/test_refs.py b/git/test/test_refs.py
index 092861da..d3b1f681 100644
--- a/git/test/test_refs.py
+++ b/git/test/test_refs.py
@@ -24,6 +24,11 @@ class TestRefs(TestBase):
assert isinstance(instance, ref_type)
# END for each name
# END for each type
+
+ # invalid path
+ self.failUnlessRaises(ValueError, TagReference, self.rorepo, "refs/invalid/tag")
+ # works without path check
+ TagReference(self.rorepo, "refs/invalid/tag", check_path=False)
def test_tag_base(self):
tag_object_refs = list()
diff --git a/git/test/test_remote.py b/git/test/test_remote.py
index af6915a3..3e9ba8b8 100644
--- a/git/test/test_remote.py
+++ b/git/test/test_remote.py
@@ -16,430 +16,482 @@ import random
random.seed(0)
class TestRemoteProgress(RemoteProgress):
- __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' )
- def __init__(self):
- super(TestRemoteProgress, self).__init__()
- self._seen_lines = list()
- self._stages_per_op = dict()
- self._num_progress_messages = 0
-
- def _parse_progress_line(self, line):
- # we may remove the line later if it is dropped
- # Keep it for debugging
- self._seen_lines.append(line)
- rval = super(TestRemoteProgress, self)._parse_progress_line(line)
- assert len(line) > 1, "line %r too short" % line
- return rval
-
- def line_dropped(self, line):
- try:
- self._seen_lines.remove(line)
- except ValueError:
- pass
-
- def update(self, op_code, cur_count, max_count=None, message=''):
- # check each stage only comes once
- op_id = op_code & self.OP_MASK
- assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
-
- self._stages_per_op.setdefault(op_id, 0)
- self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK)
-
- if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):
- assert message
- # END check we get message
-
- self._num_progress_messages += 1
-
-
- def make_assertion(self):
- # we don't always receive messages
- if not self._seen_lines:
- return
-
- # sometimes objects are not compressed which is okay
- assert len(self._seen_ops) in (2,3)
- assert self._stages_per_op
-
- # must have seen all stages
- for op, stages in self._stages_per_op.items():
- assert stages & self.STAGE_MASK == self.STAGE_MASK
- # END for each op/stage
+ __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' )
+ def __init__(self):
+ super(TestRemoteProgress, self).__init__()
+ self._seen_lines = list()
+ self._stages_per_op = dict()
+ self._num_progress_messages = 0
+
+ def _parse_progress_line(self, line):
+ # we may remove the line later if it is dropped
+ # Keep it for debugging
+ self._seen_lines.append(line)
+ rval = super(TestRemoteProgress, self)._parse_progress_line(line)
+ assert len(line) > 1, "line %r too short" % line
+ return rval
+
+ def line_dropped(self, line):
+ try:
+ self._seen_lines.remove(line)
+ except ValueError:
+ pass
+
+ def update(self, op_code, cur_count, max_count=None, message=''):
+ # check each stage only comes once
+ op_id = op_code & self.OP_MASK
+ assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
+
+ self._stages_per_op.setdefault(op_id, 0)
+ self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK)
+
+ if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):
+ assert message
+ # END check we get message
+
+ self._num_progress_messages += 1
+
+
+ def make_assertion(self):
+ # we don't always receive messages
+ if not self._seen_lines:
+ return
+
+ # sometimes objects are not compressed which is okay
+ assert len(self._seen_ops) in (2,3)
+ assert self._stages_per_op
+
+ # must have seen all stages
+ for op, stages in self._stages_per_op.items():
+ assert stages & self.STAGE_MASK == self.STAGE_MASK
+ # END for each op/stage
- def assert_received_message(self):
- assert self._num_progress_messages
-
+ def assert_received_message(self):
+ assert self._num_progress_messages
+
class TestRemote(TestBase):
-
- def _print_fetchhead(self, repo):
- fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))
- fp.close()
-
-
- def _do_test_fetch_result(self, results, remote):
- # self._print_fetchhead(remote.repo)
- assert len(results) > 0 and isinstance(results[0], FetchInfo)
- for info in results:
- assert isinstance(info.note, basestring)
- if isinstance(info.ref, Reference):
- assert info.flags != 0
- # END reference type flags handling
- assert isinstance(info.ref, (SymbolicReference, Reference))
- if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD):
- assert isinstance(info.old_commit, Commit)
- else:
- assert info.old_commit is None
- # END forced update checking
- # END for each info
-
- def _do_test_push_result(self, results, remote):
- assert len(results) > 0 and isinstance(results[0], PushInfo)
- for info in results:
- assert info.flags
- assert isinstance(info.summary, basestring)
- if info.old_commit is not None:
- assert isinstance(info.old_commit, Commit)
- if info.flags & info.ERROR:
- has_one = False
- for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE):
- has_one |= bool(info.flags & bitflag)
- # END for each bitflag
- assert has_one
- else:
- # there must be a remote commit
- if info.flags & info.DELETED == 0:
- assert isinstance(info.local_ref, Reference)
- else:
- assert info.local_ref is None
- assert type(info.remote_ref) in (TagReference, RemoteReference)
- # END error checking
- # END for each info
-
-
- def _do_test_fetch_info(self, repo):
- self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '')
- self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
-
- def _commit_random_file(self, repo):
- #Create a file with a random name and random data and commit it to repo.
- # Return the commited absolute file path
- index = repo.index
- new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo)
- index.add([new_file])
- index.commit("Committing %s" % new_file)
- return new_file
-
- def _do_test_fetch(self,remote, rw_repo, remote_repo):
- # specialized fetch testing to de-clutter the main test
- self._do_test_fetch_info(rw_repo)
-
- def fetch_and_test(remote, **kwargs):
- progress = TestRemoteProgress()
- kwargs['progress'] = progress
- res = remote.fetch(**kwargs)
- progress.make_assertion()
- self._do_test_fetch_result(res, remote)
- return res
- # END fetch and check
-
- def get_info(res, remote, name):
- return res["%s/%s"%(remote,name)]
-
- # put remote head to master as it is garantueed to exist
- remote_repo.head.reference = remote_repo.heads.master
-
- res = fetch_and_test(remote)
- # all uptodate
- for info in res:
- assert info.flags & info.HEAD_UPTODATE
-
- # rewind remote head to trigger rejection
- # index must be false as remote is a bare repo
- rhead = remote_repo.head
- remote_commit = rhead.commit
- rhead.reset("HEAD~2", index=False)
- res = fetch_and_test(remote)
- mkey = "%s/%s"%(remote,'master')
- master_info = res[mkey]
- assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None
-
- # normal fast forward - set head back to previous one
- rhead.commit = remote_commit
- res = fetch_and_test(remote)
- assert res[mkey].flags & FetchInfo.FAST_FORWARD
-
- # new remote branch
- new_remote_branch = Head.create(remote_repo, "new_branch")
- res = fetch_and_test(remote)
- new_branch_info = get_info(res, remote, new_remote_branch)
- assert new_branch_info.flags & FetchInfo.NEW_HEAD
-
- # remote branch rename ( causes creation of a new one locally )
- new_remote_branch.rename("other_branch_name")
- res = fetch_and_test(remote)
- other_branch_info = get_info(res, remote, new_remote_branch)
- assert other_branch_info.ref.commit == new_branch_info.ref.commit
-
- # remove new branch
- Head.delete(new_remote_branch.repo, new_remote_branch)
- res = fetch_and_test(remote)
- # deleted remote will not be fetched
- self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch)
-
- # prune stale tracking branches
- stale_refs = remote.stale_refs
- assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)
- RemoteReference.delete(rw_repo, *stale_refs)
-
- # test single branch fetch with refspec including target remote
- res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)
- assert len(res) == 1 and get_info(res, remote, 'master')
-
- # ... with respec and no target
- res = fetch_and_test(remote, refspec='master')
- assert len(res) == 1
-
- # add new tag reference
- rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")
- res = fetch_and_test(remote, tags=True)
- tinfo = res[str(rtag)]
- assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit
- assert tinfo.flags & tinfo.NEW_TAG
-
- # adjust tag commit
- Reference.set_object(rtag, rhead.commit.parents[0].parents[0])
- res = fetch_and_test(remote, tags=True)
- tinfo = res[str(rtag)]
- assert tinfo.commit == rtag.commit
- assert tinfo.flags & tinfo.TAG_UPDATE
-
- # delete remote tag - local one will stay
- TagReference.delete(remote_repo, rtag)
- res = fetch_and_test(remote, tags=True)
- self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag))
-
- # provoke to receive actual objects to see what kind of output we have to
- # expect. For that we need a remote transport protocol
- # Create a new UN-shared repo and fetch into it after we pushed a change
- # to the shared repo
- other_repo_dir = tempfile.mktemp("other_repo")
- # must clone with a local path for the repo implementation not to freak out
- # as it wants local paths only ( which I can understand )
- other_repo = remote_repo.clone(other_repo_dir, shared=False)
- remote_repo_url = "git://localhost%s"%remote_repo.git_dir
-
- # put origin to git-url
- other_origin = other_repo.remotes.origin
- other_origin.config_writer.set("url", remote_repo_url)
- # it automatically creates alternates as remote_repo is shared as well.
- # It will use the transport though and ignore alternates when fetching
- # assert not other_repo.alternates # this would fail
-
- # assure we are in the right state
- rw_repo.head.reset(remote.refs.master, working_tree=True)
- try:
- self._commit_random_file(rw_repo)
- remote.push(rw_repo.head.reference)
-
- # here I would expect to see remote-information about packing
- # objects and so on. Unfortunately, this does not happen
- # if we are redirecting the output - git explicitly checks for this
- # and only provides progress information to ttys
- res = fetch_and_test(other_origin)
- finally:
- shutil.rmtree(other_repo_dir)
- # END test and cleanup
-
- def _test_push_and_pull(self,remote, rw_repo, remote_repo):
- # push our changes
- lhead = rw_repo.head
- lindex = rw_repo.index
- # assure we are on master and it is checked out where the remote is
- try:
- lhead.reference = rw_repo.heads.master
- except AttributeError:
- # if the author is on a non-master branch, the clones might not have
- # a local master yet. We simply create it
- lhead.reference = rw_repo.create_head('master')
- # END master handling
- lhead.reset(remote.refs.master, working_tree=True)
-
- # push without spec should fail ( without further configuration )
- # well, works nicely
- # self.failUnlessRaises(GitCommandError, remote.push)
-
- # simple file push
- self._commit_random_file(rw_repo)
- progress = TestRemoteProgress()
- res = remote.push(lhead.reference, progress)
- assert isinstance(res, IterableList)
- self._do_test_push_result(res, remote)
- progress.make_assertion()
-
- # rejected - undo last commit
- lhead.reset("HEAD~1")
- res = remote.push(lhead.reference)
- assert res[0].flags & PushInfo.ERROR
- assert res[0].flags & PushInfo.REJECTED
- self._do_test_push_result(res, remote)
-
- # force rejected pull
- res = remote.push('+%s' % lhead.reference)
- assert res[0].flags & PushInfo.ERROR == 0
- assert res[0].flags & PushInfo.FORCED_UPDATE
- self._do_test_push_result(res, remote)
-
- # invalid refspec
- res = remote.push("hellothere")
- assert len(res) == 0
-
- # push new tags
- progress = TestRemoteProgress()
- to_be_updated = "my_tag.1.0RV"
- new_tag = TagReference.create(rw_repo, to_be_updated)
- other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", message="my message")
- res = remote.push(progress=progress, tags=True)
- assert res[-1].flags & PushInfo.NEW_TAG
- progress.make_assertion()
- self._do_test_push_result(res, remote)
-
- # update push new tags
- # Rejection is default
- new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)
- res = remote.push(tags=True)
- self._do_test_push_result(res, remote)
- assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR
-
- # push force this tag
- res = remote.push("+%s" % new_tag.path)
- assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE
-
- # delete tag - have to do it using refspec
- res = remote.push(":%s" % new_tag.path)
- self._do_test_push_result(res, remote)
- assert res[0].flags & PushInfo.DELETED
- # Currently progress is not properly transferred, especially not using
- # the git daemon
- # progress.assert_received_message()
-
- # push new branch
- new_head = Head.create(rw_repo, "my_new_branch")
- progress = TestRemoteProgress()
- res = remote.push(new_head, progress)
- assert res[0].flags & PushInfo.NEW_HEAD
- progress.make_assertion()
- self._do_test_push_result(res, remote)
-
- # delete new branch on the remote end and locally
- res = remote.push(":%s" % new_head.path)
- self._do_test_push_result(res, remote)
- Head.delete(rw_repo, new_head)
- assert res[-1].flags & PushInfo.DELETED
-
- # --all
- res = remote.push(all=True)
- self._do_test_push_result(res, remote)
-
- remote.pull('master')
-
- # cleanup - delete created tags and branches as we are in an innerloop on
- # the same repository
- TagReference.delete(rw_repo, new_tag, other_tag)
- remote.push(":%s" % other_tag.path)
-
- @with_rw_and_rw_remote_repo('0.1.6')
- def test_base(self, rw_repo, remote_repo):
- num_remotes = 0
- remote_set = set()
- ran_fetch_test = False
-
- for remote in rw_repo.remotes:
- num_remotes += 1
- assert remote == remote
- assert str(remote) != repr(remote)
- remote_set.add(remote)
- remote_set.add(remote) # should already exist
-
- # REFS
- refs = remote.refs
- assert refs
- for ref in refs:
- assert ref.remote_name == remote.name
- assert ref.remote_head
- # END for each ref
-
- # OPTIONS
- # cannot use 'fetch' key anymore as it is now a method
- for opt in ("url", ):
- val = getattr(remote, opt)
- reader = remote.config_reader
- assert reader.get(opt) == val
- assert reader.get_value(opt, None) == val
-
- # unable to write with a reader
- self.failUnlessRaises(IOError, reader.set, opt, "test")
-
- # change value
- writer = remote.config_writer
- new_val = "myval"
- writer.set(opt, new_val)
- assert writer.get(opt) == new_val
- writer.set(opt, val)
- assert writer.get(opt) == val
- del(writer)
- assert getattr(remote, opt) == val
- # END for each default option key
-
- # RENAME
- other_name = "totally_other_name"
- prev_name = remote.name
- assert remote.rename(other_name) == remote
- assert prev_name != remote.name
- # multiple times
- for time in range(2):
- assert remote.rename(prev_name).name == prev_name
- # END for each rename ( back to prev_name )
-
- # PUSH/PULL TESTING
- self._test_push_and_pull(remote, rw_repo, remote_repo)
-
- # FETCH TESTING
- # Only for remotes - local cases are the same or less complicated
- # as additional progress information will never be emitted
- if remote.name == "daemon_origin":
- self._do_test_fetch(remote, rw_repo, remote_repo)
- ran_fetch_test = True
- # END fetch test
-
- remote.update()
- # END for each remote
-
- assert ran_fetch_test
- assert num_remotes
- assert num_remotes == len(remote_set)
-
- origin = rw_repo.remote('origin')
- assert origin == rw_repo.remotes.origin
-
- @with_rw_repo('HEAD', bare=True)
- def test_creation_and_removal(self, bare_rw_repo):
- new_name = "test_new_one"
- arg_list = (new_name, "git@server:hello.git")
- remote = Remote.create(bare_rw_repo, *arg_list )
- assert remote.name == "test_new_one"
- assert remote in bare_rw_repo.remotes
-
- # create same one again
- self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
-
- Remote.remove(bare_rw_repo, new_name)
-
- for remote in bare_rw_repo.remotes:
- if remote.name == new_name:
- raise AssertionError("Remote removal failed")
- # END if deleted remote matches existing remote's name
- # END for each remote
-
-
-
+
+ def _print_fetchhead(self, repo):
+ fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))
+ fp.close()
+
+
+ def _do_test_fetch_result(self, results, remote):
+ # self._print_fetchhead(remote.repo)
+ assert len(results) > 0 and isinstance(results[0], FetchInfo)
+ for info in results:
+ assert isinstance(info.note, basestring)
+ if isinstance(info.ref, Reference):
+ assert info.flags != 0
+ # END reference type flags handling
+ assert isinstance(info.ref, (SymbolicReference, Reference))
+ if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD):
+ assert isinstance(info.old_commit, Commit)
+ else:
+ assert info.old_commit is None
+ # END forced update checking
+ # END for each info
+
+ def _do_test_push_result(self, results, remote):
+ assert len(results) > 0 and isinstance(results[0], PushInfo)
+ for info in results:
+ assert info.flags
+ assert isinstance(info.summary, basestring)
+ if info.old_commit is not None:
+ assert isinstance(info.old_commit, Commit)
+ if info.flags & info.ERROR:
+ has_one = False
+ for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE):
+ has_one |= bool(info.flags & bitflag)
+ # END for each bitflag
+ assert has_one
+ else:
+ # there must be a remote commit
+ if info.flags & info.DELETED == 0:
+ assert isinstance(info.local_ref, Reference)
+ else:
+ assert info.local_ref is None
+ assert type(info.remote_ref) in (TagReference, RemoteReference)
+ # END error checking
+ # END for each info
+
+
+ def _do_test_fetch_info(self, repo):
+ self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '')
+ self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
+
+ def _commit_random_file(self, repo):
+ #Create a file with a random name and random data and commit it to repo.
+ # Return the commited absolute file path
+ index = repo.index
+ new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo)
+ index.add([new_file])
+ index.commit("Committing %s" % new_file)
+ return new_file
+
+ def _do_test_fetch(self,remote, rw_repo, remote_repo):
+ # specialized fetch testing to de-clutter the main test
+ self._do_test_fetch_info(rw_repo)
+
+ def fetch_and_test(remote, **kwargs):
+ progress = TestRemoteProgress()
+ kwargs['progress'] = progress
+ res = remote.fetch(**kwargs)
+ progress.make_assertion()
+ self._do_test_fetch_result(res, remote)
+ return res
+ # END fetch and check
+
+ def get_info(res, remote, name):
+ return res["%s/%s"%(remote,name)]
+
+ # put remote head to master as it is garantueed to exist
+ remote_repo.head.reference = remote_repo.heads.master
+
+ res = fetch_and_test(remote)
+ # all uptodate
+ for info in res:
+ assert info.flags & info.HEAD_UPTODATE
+
+ # rewind remote head to trigger rejection
+ # index must be false as remote is a bare repo
+ rhead = remote_repo.head
+ remote_commit = rhead.commit
+ rhead.reset("HEAD~2", index=False)
+ res = fetch_and_test(remote)
+ mkey = "%s/%s"%(remote,'master')
+ master_info = res[mkey]
+ assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None
+
+ # normal fast forward - set head back to previous one
+ rhead.commit = remote_commit
+ res = fetch_and_test(remote)
+ assert res[mkey].flags & FetchInfo.FAST_FORWARD
+
+ # new remote branch
+ new_remote_branch = Head.create(remote_repo, "new_branch")
+ res = fetch_and_test(remote)
+ new_branch_info = get_info(res, remote, new_remote_branch)
+ assert new_branch_info.flags & FetchInfo.NEW_HEAD
+
+ # remote branch rename ( causes creation of a new one locally )
+ new_remote_branch.rename("other_branch_name")
+ res = fetch_and_test(remote)
+ other_branch_info = get_info(res, remote, new_remote_branch)
+ assert other_branch_info.ref.commit == new_branch_info.ref.commit
+
+ # remove new branch
+ Head.delete(new_remote_branch.repo, new_remote_branch)
+ res = fetch_and_test(remote)
+ # deleted remote will not be fetched
+ self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch)
+
+ # prune stale tracking branches
+ stale_refs = remote.stale_refs
+ assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)
+ RemoteReference.delete(rw_repo, *stale_refs)
+
+ # test single branch fetch with refspec including target remote
+ res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)
+ assert len(res) == 1 and get_info(res, remote, 'master')
+
+ # ... with respec and no target
+ res = fetch_and_test(remote, refspec='master')
+ assert len(res) == 1
+
+ # add new tag reference
+ rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")
+ res = fetch_and_test(remote, tags=True)
+ tinfo = res[str(rtag)]
+ assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit
+ assert tinfo.flags & tinfo.NEW_TAG
+
+ # adjust tag commit
+ Reference.set_object(rtag, rhead.commit.parents[0].parents[0])
+ res = fetch_and_test(remote, tags=True)
+ tinfo = res[str(rtag)]
+ assert tinfo.commit == rtag.commit
+ assert tinfo.flags & tinfo.TAG_UPDATE
+
+ # delete remote tag - local one will stay
+ TagReference.delete(remote_repo, rtag)
+ res = fetch_and_test(remote, tags=True)
+ self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag))
+
+ # provoke to receive actual objects to see what kind of output we have to
+ # expect. For that we need a remote transport protocol
+ # Create a new UN-shared repo and fetch into it after we pushed a change
+ # to the shared repo
+ other_repo_dir = tempfile.mktemp("other_repo")
+ # must clone with a local path for the repo implementation not to freak out
+ # as it wants local paths only ( which I can understand )
+ other_repo = remote_repo.clone(other_repo_dir, shared=False)
+ remote_repo_url = "git://localhost%s"%remote_repo.git_dir
+
+ # put origin to git-url
+ other_origin = other_repo.remotes.origin
+ other_origin.config_writer.set("url", remote_repo_url)
+ # it automatically creates alternates as remote_repo is shared as well.
+ # It will use the transport though and ignore alternates when fetching
+ # assert not other_repo.alternates # this would fail
+
+ # assure we are in the right state
+ rw_repo.head.reset(remote.refs.master, working_tree=True)
+ try:
+ self._commit_random_file(rw_repo)
+ remote.push(rw_repo.head.reference)
+
+ # here I would expect to see remote-information about packing
+ # objects and so on. Unfortunately, this does not happen
+ # if we are redirecting the output - git explicitly checks for this
+ # and only provides progress information to ttys
+ res = fetch_and_test(other_origin)
+ finally:
+ shutil.rmtree(other_repo_dir)
+ # END test and cleanup
+
+ def _assert_push_and_pull(self,remote, rw_repo, remote_repo):
+ # push our changes
+ lhead = rw_repo.head
+ lindex = rw_repo.index
+ # assure we are on master and it is checked out where the remote is
+ try:
+ lhead.reference = rw_repo.heads.master
+ except AttributeError:
+ # if the author is on a non-master branch, the clones might not have
+ # a local master yet. We simply create it
+ lhead.reference = rw_repo.create_head('master')
+ # END master handling
+ lhead.reset(remote.refs.master, working_tree=True)
+
+ # push without spec should fail ( without further configuration )
+ # well, works nicely
+ # self.failUnlessRaises(GitCommandError, remote.push)
+
+ # simple file push
+ self._commit_random_file(rw_repo)
+ progress = TestRemoteProgress()
+ res = remote.push(lhead.reference, progress)
+ assert isinstance(res, IterableList)
+ self._do_test_push_result(res, remote)
+ progress.make_assertion()
+
+ # rejected - undo last commit
+ lhead.reset("HEAD~1")
+ res = remote.push(lhead.reference)
+ assert res[0].flags & PushInfo.ERROR
+ assert res[0].flags & PushInfo.REJECTED
+ self._do_test_push_result(res, remote)
+
+ # force rejected pull
+ res = remote.push('+%s' % lhead.reference)
+ assert res[0].flags & PushInfo.ERROR == 0
+ assert res[0].flags & PushInfo.FORCED_UPDATE
+ self._do_test_push_result(res, remote)
+
+ # invalid refspec
+ res = remote.push("hellothere")
+ assert len(res) == 0
+
+ # push new tags
+ progress = TestRemoteProgress()
+ to_be_updated = "my_tag.1.0RV"
+ new_tag = TagReference.create(rw_repo, to_be_updated)
+ other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", message="my message")
+ res = remote.push(progress=progress, tags=True)
+ assert res[-1].flags & PushInfo.NEW_TAG
+ progress.make_assertion()
+ self._do_test_push_result(res, remote)
+
+ # update push new tags
+ # Rejection is default
+ new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)
+ res = remote.push(tags=True)
+ self._do_test_push_result(res, remote)
+ assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR
+
+ # push force this tag
+ res = remote.push("+%s" % new_tag.path)
+ assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE
+
+ # delete tag - have to do it using refspec
+ res = remote.push(":%s" % new_tag.path)
+ self._do_test_push_result(res, remote)
+ assert res[0].flags & PushInfo.DELETED
+ # Currently progress is not properly transferred, especially not using
+ # the git daemon
+ # progress.assert_received_message()
+
+ # push new branch
+ new_head = Head.create(rw_repo, "my_new_branch")
+ progress = TestRemoteProgress()
+ res = remote.push(new_head, progress)
+ assert res[0].flags & PushInfo.NEW_HEAD
+ progress.make_assertion()
+ self._do_test_push_result(res, remote)
+
+ # delete new branch on the remote end and locally
+ res = remote.push(":%s" % new_head.path)
+ self._do_test_push_result(res, remote)
+ Head.delete(rw_repo, new_head)
+ assert res[-1].flags & PushInfo.DELETED
+
+ # --all
+ res = remote.push(all=True)
+ self._do_test_push_result(res, remote)
+
+ remote.pull('master')
+
+ # cleanup - delete created tags and branches as we are in an innerloop on
+ # the same repository
+ TagReference.delete(rw_repo, new_tag, other_tag)
+ remote.push(":%s" % other_tag.path)
+
+ @with_rw_and_rw_remote_repo('0.1.6')
+ def test_base(self, rw_repo, remote_repo):
+ num_remotes = 0
+ remote_set = set()
+ ran_fetch_test = False
+
+ for remote in rw_repo.remotes:
+ num_remotes += 1
+ assert remote == remote
+ assert str(remote) != repr(remote)
+ remote_set.add(remote)
+ remote_set.add(remote) # should already exist
+
+ # REFS
+ refs = remote.refs
+ assert refs
+ for ref in refs:
+ assert ref.remote_name == remote.name
+ assert ref.remote_head
+ # END for each ref
+
+ # OPTIONS
+ # cannot use 'fetch' key anymore as it is now a method
+ for opt in ("url", ):
+ val = getattr(remote, opt)
+ reader = remote.config_reader
+ assert reader.get(opt) == val
+ assert reader.get_value(opt, None) == val
+
+ # unable to write with a reader
+ self.failUnlessRaises(IOError, reader.set, opt, "test")
+
+ # change value
+ writer = remote.config_writer
+ new_val = "myval"
+ writer.set(opt, new_val)
+ assert writer.get(opt) == new_val
+ writer.set(opt, val)
+ assert writer.get(opt) == val
+ del(writer)
+ assert getattr(remote, opt) == val
+ # END for each default option key
+
+ # RENAME
+ other_name = "totally_other_name"
+ prev_name = remote.name
+ assert remote.rename(other_name) == remote
+ assert prev_name != remote.name
+ # multiple times
+ for time in range(2):
+ assert remote.rename(prev_name).name == prev_name
+ # END for each rename ( back to prev_name )
+
+ # PUSH/PULL TESTING
+ self._assert_push_and_pull(remote, rw_repo, remote_repo)
+
+ # FETCH TESTING
+ # Only for remotes - local cases are the same or less complicated
+ # as additional progress information will never be emitted
+ if remote.name == "daemon_origin":
+ self._do_test_fetch(remote, rw_repo, remote_repo)
+ ran_fetch_test = True
+ # END fetch test
+
+ remote.update()
+ # END for each remote
+
+ assert ran_fetch_test
+ assert num_remotes
+ assert num_remotes == len(remote_set)
+
+ origin = rw_repo.remote('origin')
+ assert origin == rw_repo.remotes.origin
+
+ @with_rw_repo('HEAD', bare=True)
+ def test_creation_and_removal(self, bare_rw_repo):
+ new_name = "test_new_one"
+ arg_list = (new_name, "git@server:hello.git")
+ remote = Remote.create(bare_rw_repo, *arg_list )
+ assert remote.name == "test_new_one"
+ assert remote in bare_rw_repo.remotes
+
+ # create same one again
+ self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
+
+ Remote.remove(bare_rw_repo, new_name)
+
+ for remote in bare_rw_repo.remotes:
+ if remote.name == new_name:
+ raise AssertionError("Remote removal failed")
+ # END if deleted remote matches existing remote's name
+ # END for each remote
+
+ def test_fetch_info(self):
+ # assure we can handle remote-tracking branches
+ fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython"
+ remote_info_line_fmt = "* [new branch] nomatter -> %s"
+ fi = FetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "local/master",
+ fetch_info_line_fmt % 'remote-tracking branch')
+ assert fi.ref.is_valid()
+ assert fi.ref.commit
+
+ # handles non-default refspecs: One can specify a different path in refs/remotes
+ # or a special path just in refs/something for instance
+
+ fi = FetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "subdir/tagname",
+ fetch_info_line_fmt % 'tag')
+
+ assert isinstance(fi.ref, TagReference)
+ assert fi.ref.path.startswith('refs/tags')
+
+ # it could be in a remote direcftory though
+ fi = FetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "remotename/tags/tagname",
+ fetch_info_line_fmt % 'tag')
+
+ assert isinstance(fi.ref, TagReference)
+ assert fi.ref.path.startswith('refs/remotes/')
+
+ # it can also be anywhere !
+ tag_path = "refs/something/remotename/tags/tagname"
+ fi = FetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % tag_path,
+ fetch_info_line_fmt % 'tag')
+
+ assert isinstance(fi.ref, TagReference)
+ assert fi.ref.path == tag_path
+
+ # branches default to refs/remotes
+ fi = FetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "remotename/branch",
+ fetch_info_line_fmt % 'branch')
+
+ assert isinstance(fi.ref, RemoteReference)
+ assert fi.ref.remote_name == 'remotename'
+
+ # but you can force it anywhere, in which case we only have a references
+ fi = FetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "refs/something/branch",
+ fetch_info_line_fmt % 'branch')
+
+ assert type(fi.ref) is Reference
+ assert fi.ref.path == "refs/something/branch"
+
+