diff options
Diffstat (limited to 'git/test/test_remote.py')
-rw-r--r-- | git/test/test_remote.py | 243 |
1 files changed, 120 insertions, 123 deletions
diff --git a/git/test/test_remote.py b/git/test/test_remote.py index b1248096..a5a73ce1 100644 --- a/git/test/test_remote.py +++ b/git/test/test_remote.py @@ -12,17 +12,19 @@ import shutil import os import random -# assure we have repeatable results +# assure we have repeatable results random.seed(0) + class TestRemoteProgress(RemoteProgress): - __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' ) + __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages') + def __init__(self): super(TestRemoteProgress, self).__init__() self._seen_lines = list() self._stages_per_op = dict() self._num_progress_messages = 0 - + def _parse_progress_line(self, line): # we may remove the line later if it is dropped # Keep it for debugging @@ -30,37 +32,36 @@ class TestRemoteProgress(RemoteProgress): rval = super(TestRemoteProgress, self)._parse_progress_line(line) assert len(line) > 1, "line %r too short" % line return rval - + def line_dropped(self, line): try: self._seen_lines.remove(line) except ValueError: pass - + def update(self, op_code, cur_count, max_count=None, message=''): # check each stage only comes once op_id = op_code & self.OP_MASK assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) - + self._stages_per_op.setdefault(op_id, 0) - self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) - - if op_code & (self.WRITING|self.END) == (self.WRITING|self.END): + self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK) + + if op_code & (self.WRITING | self.END) == (self.WRITING | self.END): assert message # END check we get message - + self._num_progress_messages += 1 - - + def make_assertion(self): # we don't always receive messages if not self._seen_lines: return - + # sometimes objects are not compressed which is okay - assert len(self._seen_ops) in (2,3) + assert len(self._seen_ops) in (2, 3) assert self._stages_per_op - + # must have seen all stages for op, stages in self._stages_per_op.items(): assert stages & self.STAGE_MASK == self.STAGE_MASK @@ -68,15 +69,14 @@ class TestRemoteProgress(RemoteProgress): def assert_received_message(self): assert self._num_progress_messages - + class TestRemote(TestBase): - + def _print_fetchhead(self, repo): fp = open(os.path.join(repo.git_dir, "FETCH_HEAD")) fp.close() - - + def _do_test_fetch_result(self, results, remote): # self._print_fetchhead(remote.repo) assert len(results) > 0 and isinstance(results[0], FetchInfo) @@ -84,15 +84,15 @@ class TestRemote(TestBase): assert isinstance(info.note, basestring) if isinstance(info.ref, Reference): assert info.flags != 0 - # END reference type flags handling + # END reference type flags handling assert isinstance(info.ref, (SymbolicReference, Reference)) - if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD): + if info.flags & (info.FORCED_UPDATE | info.FAST_FORWARD): assert isinstance(info.old_commit, Commit) else: assert info.old_commit is None - # END forced update checking + # END forced update checking # END for each info - + def _do_test_push_result(self, results, remote): assert len(results) > 0 and isinstance(results[0], PushInfo) for info in results: @@ -108,32 +108,31 @@ class TestRemote(TestBase): assert has_one else: # there must be a remote commit - if info.flags & info.DELETED == 0: + if info.flags & info.DELETED == 0: assert isinstance(info.local_ref, Reference) else: assert info.local_ref is None assert type(info.remote_ref) in (TagReference, RemoteReference) # END error checking - # END for each info - - + # END for each info + def _do_test_fetch_info(self, repo): self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '') self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') - + def _commit_random_file(self, repo): #Create a file with a random name and random data and commit it to repo. # Return the commited absolute file path index = repo.index - new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo) + new_file = self._make_file(os.path.basename(tempfile.mktemp()), str(random.random()), repo) index.add([new_file]) index.commit("Committing %s" % new_file) return new_file - - def _do_test_fetch(self,remote, rw_repo, remote_repo): + + def _do_test_fetch(self, remote, rw_repo, remote_repo): # specialized fetch testing to de-clutter the main test self._do_test_fetch_info(rw_repo) - + def fetch_and_test(remote, **kwargs): progress = TestRemoteProgress() kwargs['progress'] = progress @@ -142,60 +141,60 @@ class TestRemote(TestBase): self._do_test_fetch_result(res, remote) return res # END fetch and check - + def get_info(res, remote, name): - return res["%s/%s"%(remote,name)] - + return res["%s/%s" % (remote, name)] + # put remote head to master as it is garantueed to exist remote_repo.head.reference = remote_repo.heads.master - + res = fetch_and_test(remote) # all uptodate for info in res: assert info.flags & info.HEAD_UPTODATE - + # rewind remote head to trigger rejection # index must be false as remote is a bare repo rhead = remote_repo.head remote_commit = rhead.commit rhead.reset("HEAD~2", index=False) res = fetch_and_test(remote) - mkey = "%s/%s"%(remote,'master') + mkey = "%s/%s" % (remote, 'master') master_info = res[mkey] assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None - + # normal fast forward - set head back to previous one rhead.commit = remote_commit res = fetch_and_test(remote) assert res[mkey].flags & FetchInfo.FAST_FORWARD - + # new remote branch new_remote_branch = Head.create(remote_repo, "new_branch") res = fetch_and_test(remote) new_branch_info = get_info(res, remote, new_remote_branch) assert new_branch_info.flags & FetchInfo.NEW_HEAD - + # remote branch rename ( causes creation of a new one locally ) new_remote_branch.rename("other_branch_name") res = fetch_and_test(remote) other_branch_info = get_info(res, remote, new_remote_branch) assert other_branch_info.ref.commit == new_branch_info.ref.commit - + # remove new branch Head.delete(new_remote_branch.repo, new_remote_branch) res = fetch_and_test(remote) # deleted remote will not be fetched self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) - + # prune stale tracking branches stale_refs = remote.stale_refs assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference) RemoteReference.delete(rw_repo, *stale_refs) - + # test single branch fetch with refspec including target remote - res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote) + res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote) assert len(res) == 1 and get_info(res, remote, 'master') - + # ... with respec and no target res = fetch_and_test(remote, refspec='master') assert len(res) == 1 @@ -203,27 +202,27 @@ class TestRemote(TestBase): # ... multiple refspecs res = fetch_and_test(remote, refspec=['master', 'fred']) assert len(res) == 1 - + # add new tag reference rtag = TagReference.create(remote_repo, "1.0-RV_hello.there") res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit assert tinfo.flags & tinfo.NEW_TAG - + # adjust tag commit Reference.set_object(rtag, rhead.commit.parents[0].parents[0]) res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert tinfo.commit == rtag.commit assert tinfo.flags & tinfo.TAG_UPDATE - + # delete remote tag - local one will stay TagReference.delete(remote_repo, rtag) res = fetch_and_test(remote, tags=True) self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) - - # provoke to receive actual objects to see what kind of output we have to + + # provoke to receive actual objects to see what kind of output we have to # expect. For that we need a remote transport protocol # Create a new UN-shared repo and fetch into it after we pushed a change # to the shared repo @@ -231,31 +230,31 @@ class TestRemote(TestBase): # must clone with a local path for the repo implementation not to freak out # as it wants local paths only ( which I can understand ) other_repo = remote_repo.clone(other_repo_dir, shared=False) - remote_repo_url = "git://localhost%s"%remote_repo.git_dir - + remote_repo_url = "git://localhost%s" % remote_repo.git_dir + # put origin to git-url - other_origin = other_repo.remotes.origin + other_origin = other_repo.remotes.origin other_origin.config_writer.set("url", remote_repo_url) # it automatically creates alternates as remote_repo is shared as well. # It will use the transport though and ignore alternates when fetching # assert not other_repo.alternates # this would fail - + # assure we are in the right state rw_repo.head.reset(remote.refs.master, working_tree=True) try: self._commit_random_file(rw_repo) remote.push(rw_repo.head.reference) - - # here I would expect to see remote-information about packing - # objects and so on. Unfortunately, this does not happen + + # here I would expect to see remote-information about packing + # objects and so on. Unfortunately, this does not happen # if we are redirecting the output - git explicitly checks for this # and only provides progress information to ttys res = fetch_and_test(other_origin) finally: shutil.rmtree(other_repo_dir) # END test and cleanup - - def _assert_push_and_pull(self,remote, rw_repo, remote_repo): + + def _assert_push_and_pull(self, remote, rw_repo, remote_repo): # push our changes lhead = rw_repo.head lindex = rw_repo.index @@ -263,16 +262,16 @@ class TestRemote(TestBase): try: lhead.reference = rw_repo.heads.master except AttributeError: - # if the author is on a non-master branch, the clones might not have + # if the author is on a non-master branch, the clones might not have # a local master yet. We simply create it lhead.reference = rw_repo.create_head('master') - # END master handling + # END master handling lhead.reset(remote.refs.master, working_tree=True) - + # push without spec should fail ( without further configuration ) # well, works nicely # self.failUnlessRaises(GitCommandError, remote.push) - + # simple file push self._commit_random_file(rw_repo) progress = TestRemoteProgress() @@ -280,25 +279,25 @@ class TestRemote(TestBase): assert isinstance(res, IterableList) self._do_test_push_result(res, remote) progress.make_assertion() - + # rejected - undo last commit lhead.reset("HEAD~1") res = remote.push(lhead.reference) - assert res[0].flags & PushInfo.ERROR + assert res[0].flags & PushInfo.ERROR assert res[0].flags & PushInfo.REJECTED self._do_test_push_result(res, remote) - + # force rejected pull res = remote.push('+%s' % lhead.reference) - assert res[0].flags & PushInfo.ERROR == 0 + assert res[0].flags & PushInfo.ERROR == 0 assert res[0].flags & PushInfo.FORCED_UPDATE self._do_test_push_result(res, remote) - + # invalid refspec res = remote.push("hellothere") assert len(res) == 0 - - # push new tags + + # push new tags progress = TestRemoteProgress() to_be_updated = "my_tag.1.0RV" new_tag = TagReference.create(rw_repo, to_be_updated) @@ -307,26 +306,26 @@ class TestRemote(TestBase): assert res[-1].flags & PushInfo.NEW_TAG progress.make_assertion() self._do_test_push_result(res, remote) - + # update push new tags # Rejection is default new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True) res = remote.push(tags=True) self._do_test_push_result(res, remote) assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR - + # push force this tag res = remote.push("+%s" % new_tag.path) assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE - + # delete tag - have to do it using refspec res = remote.push(":%s" % new_tag.path) self._do_test_push_result(res, remote) assert res[0].flags & PushInfo.DELETED - # Currently progress is not properly transferred, especially not using + # Currently progress is not properly transferred, especially not using # the git daemon # progress.assert_received_message() - + # push new branch new_head = Head.create(rw_repo, "my_new_branch") progress = TestRemoteProgress() @@ -334,45 +333,45 @@ class TestRemote(TestBase): assert res[0].flags & PushInfo.NEW_HEAD progress.make_assertion() self._do_test_push_result(res, remote) - + # delete new branch on the remote end and locally res = remote.push(":%s" % new_head.path) self._do_test_push_result(res, remote) Head.delete(rw_repo, new_head) assert res[-1].flags & PushInfo.DELETED - + # --all res = remote.push(all=True) self._do_test_push_result(res, remote) - + remote.pull('master') - - # cleanup - delete created tags and branches as we are in an innerloop on + + # cleanup - delete created tags and branches as we are in an innerloop on # the same repository TagReference.delete(rw_repo, new_tag, other_tag) remote.push(":%s" % other_tag.path) - + @with_rw_and_rw_remote_repo('0.1.6') def test_base(self, rw_repo, remote_repo): num_remotes = 0 remote_set = set() ran_fetch_test = False - + for remote in rw_repo.remotes: num_remotes += 1 assert remote == remote assert str(remote) != repr(remote) remote_set.add(remote) remote_set.add(remote) # should already exist - - # REFS + + # REFS refs = remote.refs assert refs for ref in refs: assert ref.remote_name == remote.name assert ref.remote_head # END for each ref - + # OPTIONS # cannot use 'fetch' key anymore as it is now a method for opt in ("url", ): @@ -380,10 +379,10 @@ class TestRemote(TestBase): reader = remote.config_reader assert reader.get(opt) == val assert reader.get_value(opt, None) == val - + # unable to write with a reader self.failUnlessRaises(IOError, reader.set, opt, "test") - + # change value writer = remote.config_writer new_val = "myval" @@ -393,9 +392,9 @@ class TestRemote(TestBase): assert writer.get(opt) == val del(writer) assert getattr(remote, opt) == val - # END for each default option key - - # RENAME + # END for each default option key + + # RENAME other_name = "totally_other_name" prev_name = remote.name assert remote.rename(other_name) == remote @@ -404,98 +403,96 @@ class TestRemote(TestBase): for time in range(2): assert remote.rename(prev_name).name == prev_name # END for each rename ( back to prev_name ) - + # PUSH/PULL TESTING self._assert_push_and_pull(remote, rw_repo, remote_repo) - + # FETCH TESTING - # Only for remotes - local cases are the same or less complicated + # Only for remotes - local cases are the same or less complicated # as additional progress information will never be emitted if remote.name == "daemon_origin": self._do_test_fetch(remote, rw_repo, remote_repo) ran_fetch_test = True - # END fetch test - + # END fetch test + remote.update() # END for each remote - + assert ran_fetch_test assert num_remotes assert num_remotes == len(remote_set) - + origin = rw_repo.remote('origin') assert origin == rw_repo.remotes.origin - + @with_rw_repo('HEAD', bare=True) def test_creation_and_removal(self, bare_rw_repo): new_name = "test_new_one" arg_list = (new_name, "git@server:hello.git") - remote = Remote.create(bare_rw_repo, *arg_list ) + remote = Remote.create(bare_rw_repo, *arg_list) assert remote.name == "test_new_one" assert remote in bare_rw_repo.remotes - + # create same one again self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) - + Remote.remove(bare_rw_repo, new_name) - + for remote in bare_rw_repo.remotes: if remote.name == new_name: raise AssertionError("Remote removal failed") # END if deleted remote matches existing remote's name # END for each remote - + def test_fetch_info(self): # assure we can handle remote-tracking branches fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython" remote_info_line_fmt = "* [new branch] nomatter -> %s" fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "local/master", + remote_info_line_fmt % "local/master", fetch_info_line_fmt % 'remote-tracking branch') assert fi.ref.is_valid() assert fi.ref.commit - + # handles non-default refspecs: One can specify a different path in refs/remotes # or a special path just in refs/something for instance - + fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "subdir/tagname", + remote_info_line_fmt % "subdir/tagname", fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path.startswith('refs/tags') - + # it could be in a remote direcftory though fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/tags/tagname", + remote_info_line_fmt % "remotename/tags/tagname", fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path.startswith('refs/remotes/') - + # it can also be anywhere ! tag_path = "refs/something/remotename/tags/tagname" fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % tag_path, + remote_info_line_fmt % tag_path, fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path == tag_path - + # branches default to refs/remotes fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/branch", + remote_info_line_fmt % "remotename/branch", fetch_info_line_fmt % 'branch') - + assert isinstance(fi.ref, RemoteReference) assert fi.ref.remote_name == 'remotename' - + # but you can force it anywhere, in which case we only have a references fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "refs/something/branch", + remote_info_line_fmt % "refs/something/branch", fetch_info_line_fmt % 'branch') - + assert type(fi.ref) is Reference assert fi.ref.path == "refs/something/branch" - - |