diff options
Diffstat (limited to 'git/test/test_remote.py')
-rw-r--r-- | git/test/test_remote.py | 228 |
1 files changed, 112 insertions, 116 deletions
diff --git a/git/test/test_remote.py b/git/test/test_remote.py index 87fcd7fe..18cfda07 100644 --- a/git/test/test_remote.py +++ b/git/test/test_remote.py @@ -5,21 +5,21 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php from git.test.lib import ( - TestBase, - with_rw_and_rw_remote_repo, - with_rw_repo, - ) + TestBase, + with_rw_and_rw_remote_repo, + with_rw_repo, +) from git.util import IterableList from git.db.interface import PushInfo, FetchInfo, RemoteProgress from git.remote import * -from git.exc import GitCommandError +from git.exc import GitCommandError from git.refs import ( - Reference, - TagReference, - RemoteReference, - Head, - SymbolicReference - ) + Reference, + TagReference, + RemoteReference, + Head, + SymbolicReference +) from nose import SkipTest @@ -28,54 +28,55 @@ import shutil import os import random -# assure we have repeatable results +# assure we have repeatable results random.seed(0) + class TestRemoteProgress(RemoteProgress): - __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages') + __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages') + def __init__(self): super(TestRemoteProgress, self).__init__() self._seen_lines = list() self._stages_per_op = dict() self._seen_ops = set() self._num_progress_messages = 0 - + def line_dropped(self, line): try: self._seen_lines.remove(line) except ValueError: pass - + def __call__(self, message, input=''): pass - + def update(self, op_code, cur_count, max_count=None, message='', input=''): # check each stage only comes once if input: self._seen_lines.append(input) - #END handle input + # END handle input op_id = op_code & self.OP_MASK assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) - + self._stages_per_op.setdefault(op_id, 0) - self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) - - if op_code & (self.WRITING|self.END) == (self.WRITING|self.END): + self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK) + + if op_code & (self.WRITING | self.END) == (self.WRITING | self.END): assert message # END check we get message - + self._num_progress_messages += 1 - - + def make_assertion(self): # we don't always receive messages if not self._seen_lines: return - + # sometimes objects are not compressed which is okay - assert len(self._stages_per_op.keys()) in (2,3) + assert len(self._stages_per_op.keys()) in (2, 3) assert self._stages_per_op - + # must have seen all stages for op, stages in self._stages_per_op.items(): assert stages & self.STAGE_MASK == self.STAGE_MASK @@ -83,15 +84,14 @@ class TestRemoteProgress(RemoteProgress): def assert_received_message(self): assert self._num_progress_messages - + class TestRemote(TestBase): - + def _print_fetchhead(self, repo): fp = open(os.path.join(repo.git_dir, "FETCH_HEAD")) fp.close() - - + def _do_test_fetch_result(self, results, remote): # self._print_fetchhead(remote.repo) assert len(results) > 0 and isinstance(results[0], FetchInfo) @@ -99,15 +99,15 @@ class TestRemote(TestBase): assert isinstance(info.note, basestring) if isinstance(info.ref, Reference): assert info.flags != 0 - # END reference type flags handling + # END reference type flags handling assert isinstance(info.ref, (SymbolicReference, Reference)) - if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD): + if info.flags & (info.FORCED_UPDATE | info.FAST_FORWARD): assert isinstance(info.old_commit_binsha, str) and len(info.old_commit_binsha) == 20 else: assert info.old_commit_binsha is None - # END forced update checking + # END forced update checking # END for each info - + def _do_test_push_result(self, results, remote): assert len(results) > 0 and isinstance(results[0], PushInfo) for info in results: @@ -123,24 +123,24 @@ class TestRemote(TestBase): assert has_one else: # there must be a remote commit - if info.flags & info.DELETED == 0: + if info.flags & info.DELETED == 0: assert isinstance(info.local_ref, Reference) else: assert info.local_ref is None assert type(info.remote_ref) in (TagReference, RemoteReference) # END error checking - # END for each info - + # END for each info + def _commit_random_file(self, repo): - #Create a file with a random name and random data and commit it to repo. + # Create a file with a random name and random data and commit it to repo. # Return the commited absolute file path index = repo.index - new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo) + new_file = self._make_file(os.path.basename(tempfile.mktemp()), str(random.random()), repo) index.add([new_file]) index.commit("Committing %s" % new_file) return new_file - - def _do_test_fetch(self,remote, rw_repo, remote_repo): + + def _do_test_fetch(self, remote, rw_repo, remote_repo): def fetch_and_test(remote, **kwargs): progress = TestRemoteProgress() kwargs['progress'] = progress @@ -149,84 +149,84 @@ class TestRemote(TestBase): self._do_test_fetch_result(res, remote) return res # END fetch and check - + def get_info(res, remote, name): - return res["%s/%s"%(remote,name)] - + return res["%s/%s" % (remote, name)] + # put remote head to master as it is garantueed to exist remote_repo.head.reference = remote_repo.heads.master - + res = fetch_and_test(remote) # all uptodate for info in res: assert info.flags & info.HEAD_UPTODATE - + # rewind remote head to trigger rejection # index must be false as remote is a bare repo rhead = remote_repo.head remote_commit = rhead.commit rhead.reset("HEAD~2", index=False) res = fetch_and_test(remote) - mkey = "%s/%s"%(remote,'master') + mkey = "%s/%s" % (remote, 'master') master_info = res[mkey] assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None - + # normal fast forward - set head back to previous one rhead.commit = remote_commit res = fetch_and_test(remote) assert res[mkey].flags & FetchInfo.FAST_FORWARD - + # new remote branch new_remote_branch = Head.create(remote_repo, "new_branch") res = fetch_and_test(remote) new_branch_info = get_info(res, remote, new_remote_branch) assert new_branch_info.flags & FetchInfo.NEW_HEAD - + # remote branch rename ( causes creation of a new one locally ) new_remote_branch.rename("other_branch_name") res = fetch_and_test(remote) other_branch_info = get_info(res, remote, new_remote_branch) assert other_branch_info.ref.commit == new_branch_info.ref.commit - + # remove new branch Head.delete(new_remote_branch.repo, new_remote_branch) res = fetch_and_test(remote) # deleted remote will not be fetched self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) - + # prune stale tracking branches stale_refs = remote.stale_refs assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference) RemoteReference.delete(rw_repo, *stale_refs) - + # test single branch fetch with refspec including target remote - res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote) + res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote) assert len(res) == 1 and get_info(res, remote, 'master') - + # ... with respec and no target res = fetch_and_test(remote, refspec='master') assert len(res) == 1 - + # add new tag reference rtag = TagReference.create(remote_repo, "1.0-RV_hello.there") res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit assert tinfo.flags & tinfo.NEW_TAG - + # adjust tag commit Reference.set_object(rtag, rhead.commit.parents[0].parents[0]) res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert tinfo.commit == rtag.commit assert tinfo.flags & tinfo.TAG_UPDATE - + # delete remote tag - local one will stay TagReference.delete(remote_repo, rtag) res = fetch_and_test(remote, tags=True) self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) - - # provoke to receive actual objects to see what kind of output we have to + + # provoke to receive actual objects to see what kind of output we have to # expect. For that we need a remote transport protocol # Create a new UN-shared repo and fetch into it after we pushed a change # to the shared repo @@ -234,31 +234,31 @@ class TestRemote(TestBase): # must clone with a local path for the repo implementation not to freak out # as it wants local paths only ( which I can understand ) other_repo = remote_repo.clone(other_repo_dir, shared=False) - remote_repo_url = "git://localhost%s"%remote_repo.git_dir - + remote_repo_url = "git://localhost%s" % remote_repo.git_dir + # put origin to git-url - other_origin = other_repo.remotes.origin + other_origin = other_repo.remotes.origin other_origin.config_writer.set("url", remote_repo_url) # it automatically creates alternates as remote_repo is shared as well. # It will use the transport though and ignore alternates when fetching # assert not other_repo.alternates # this would fail - + # assure we are in the right state rw_repo.head.reset(remote.refs.master, working_tree=True) try: self._commit_random_file(rw_repo) remote.push(rw_repo.head.reference) - - # here I would expect to see remote-information about packing - # objects and so on. Unfortunately, this does not happen + + # here I would expect to see remote-information about packing + # objects and so on. Unfortunately, this does not happen # if we are redirecting the output - git explicitly checks for this # and only provides progress information to ttys res = fetch_and_test(other_origin) finally: shutil.rmtree(other_repo_dir) # END test and cleanup - - def _verify_push_and_pull(self,remote, rw_repo, remote_repo): + + def _verify_push_and_pull(self, remote, rw_repo, remote_repo): # push our changes lhead = rw_repo.head lindex = rw_repo.index @@ -266,16 +266,16 @@ class TestRemote(TestBase): try: lhead.reference = rw_repo.heads.master except AttributeError: - # if the author is on a non-master branch, the clones might not have + # if the author is on a non-master branch, the clones might not have # a local master yet. We simply create it lhead.reference = rw_repo.create_head('master') - # END master handling + # END master handling lhead.reset(remote.refs.master, working_tree=True) - + # push without spec should fail ( without further configuration ) # well, works nicely # self.failUnlessRaises(GitCommandError, remote.push) - + # simple file push self._commit_random_file(rw_repo) progress = TestRemoteProgress() @@ -283,25 +283,25 @@ class TestRemote(TestBase): assert isinstance(res, IterableList) self._do_test_push_result(res, remote) progress.make_assertion() - + # rejected - undo last commit lhead.reset("HEAD~1") res = remote.push(lhead.reference) - assert res[0].flags & PushInfo.ERROR + assert res[0].flags & PushInfo.ERROR assert res[0].flags & PushInfo.REJECTED self._do_test_push_result(res, remote) - + # force rejected pull res = remote.push('+%s' % lhead.reference) - assert res[0].flags & PushInfo.ERROR == 0 + assert res[0].flags & PushInfo.ERROR == 0 assert res[0].flags & PushInfo.FORCED_UPDATE self._do_test_push_result(res, remote) - + # invalid refspec res = remote.push("hellothere") assert len(res) == 0 - - # push new tags + + # push new tags progress = TestRemoteProgress() to_be_updated = "my_tag.1.0RV" new_tag = TagReference.create(rw_repo, to_be_updated) @@ -310,26 +310,26 @@ class TestRemote(TestBase): assert res[-1].flags & PushInfo.NEW_TAG progress.make_assertion() self._do_test_push_result(res, remote) - + # update push new tags # Rejection is default new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True) res = remote.push(tags=True) self._do_test_push_result(res, remote) assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR - + # push force this tag res = remote.push("+%s" % new_tag.path) assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE - + # delete tag - have to do it using refspec res = remote.push(":%s" % new_tag.path) self._do_test_push_result(res, remote) assert res[0].flags & PushInfo.DELETED - # Currently progress is not properly transferred, especially not using + # Currently progress is not properly transferred, especially not using # the git daemon # progress.assert_received_message() - + # push new branch new_head = Head.create(rw_repo, "my_new_branch") progress = TestRemoteProgress() @@ -337,20 +337,20 @@ class TestRemote(TestBase): assert res[0].flags & PushInfo.NEW_HEAD progress.make_assertion() self._do_test_push_result(res, remote) - + # delete new branch on the remote end and locally res = remote.push(":%s" % new_head.path) self._do_test_push_result(res, remote) Head.delete(rw_repo, new_head) assert res[-1].flags & PushInfo.DELETED - + # --all res = remote.push(all=True) self._do_test_push_result(res, remote) - + remote.pull('master') - - # cleanup - delete created tags and branches as we are in an innerloop on + + # cleanup - delete created tags and branches as we are in an innerloop on # the same repository TagReference.delete(rw_repo, new_tag, other_tag) remote.push(":%s" % other_tag.path) @@ -359,29 +359,28 @@ class TestRemote(TestBase): # If you see this, plesase remind yourself, that all this needs to be run # per repository type ! raise SkipTest("todo") - @with_rw_and_rw_remote_repo('0.1.6') def test_base(self, rw_repo, remote_repo): num_remotes = 0 remote_set = set() ran_fetch_test = False - + for remote in rw_repo.remotes: num_remotes += 1 assert remote == remote assert str(remote) != repr(remote) remote_set.add(remote) remote_set.add(remote) # should already exist - - # REFS + + # REFS refs = remote.refs assert refs for ref in refs: assert ref.remote_name == remote.name assert ref.remote_head # END for each ref - + # OPTIONS # cannot use 'fetch' key anymore as it is now a method for opt in ("url", ): @@ -389,10 +388,10 @@ class TestRemote(TestBase): reader = remote.config_reader assert reader.get(opt) == val assert reader.get_value(opt, None) == val - + # unable to write with a reader self.failUnlessRaises(IOError, reader.set, opt, "test") - + # change value writer = remote.config_writer new_val = "myval" @@ -402,9 +401,9 @@ class TestRemote(TestBase): assert writer.get(opt) == val del(writer) assert getattr(remote, opt) == val - # END for each default option key - - # RENAME + # END for each default option key + + # RENAME other_name = "totally_other_name" prev_name = remote.name assert remote.rename(other_name) == remote @@ -413,46 +412,43 @@ class TestRemote(TestBase): for time in range(2): assert remote.rename(prev_name).name == prev_name # END for each rename ( back to prev_name ) - + # PUSH/PULL TESTING self._verify_push_and_pull(remote, rw_repo, remote_repo) - + # FETCH TESTING - # Only for remotes - local cases are the same or less complicated + # Only for remotes - local cases are the same or less complicated # as additional progress information will never be emitted if remote.name == "daemon_origin": self._do_test_fetch(remote, rw_repo, remote_repo) ran_fetch_test = True - # END fetch test - + # END fetch test + remote.update() # END for each remote - + assert ran_fetch_test assert num_remotes assert num_remotes == len(remote_set) - + origin = rw_repo.remote('origin') assert origin == rw_repo.remotes.origin - + @with_rw_repo('HEAD', bare=True) def test_creation_and_removal(self, bare_rw_repo): new_name = "test_new_one" arg_list = (new_name, "git@server:hello.git") - remote = Remote.create(bare_rw_repo, *arg_list ) + remote = Remote.create(bare_rw_repo, *arg_list) assert remote.name == "test_new_one" assert remote in bare_rw_repo.remotes - + # create same one again self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) - + Remote.remove(bare_rw_repo, new_name) - + for remote in bare_rw_repo.remotes: if remote.name == new_name: raise AssertionError("Remote removal failed") # END if deleted remote matches existing remote's name # END for each remote - - - |