diff options
Diffstat (limited to 'git/test/test_remote.py')
-rw-r--r-- | git/test/test_remote.py | 164 |
1 files changed, 81 insertions, 83 deletions
diff --git a/git/test/test_remote.py b/git/test/test_remote.py index b1248096..0b7ab574 100644 --- a/git/test/test_remote.py +++ b/git/test/test_remote.py @@ -22,7 +22,7 @@ class TestRemoteProgress(RemoteProgress): self._seen_lines = list() self._stages_per_op = dict() self._num_progress_messages = 0 - + def _parse_progress_line(self, line): # we may remove the line later if it is dropped # Keep it for debugging @@ -30,37 +30,37 @@ class TestRemoteProgress(RemoteProgress): rval = super(TestRemoteProgress, self)._parse_progress_line(line) assert len(line) > 1, "line %r too short" % line return rval - + def line_dropped(self, line): try: self._seen_lines.remove(line) except ValueError: pass - + def update(self, op_code, cur_count, max_count=None, message=''): # check each stage only comes once op_id = op_code & self.OP_MASK assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) - + self._stages_per_op.setdefault(op_id, 0) self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) - + if op_code & (self.WRITING|self.END) == (self.WRITING|self.END): assert message # END check we get message - + self._num_progress_messages += 1 - - + + def make_assertion(self): # we don't always receive messages if not self._seen_lines: return - + # sometimes objects are not compressed which is okay assert len(self._seen_ops) in (2,3) assert self._stages_per_op - + # must have seen all stages for op, stages in self._stages_per_op.items(): assert stages & self.STAGE_MASK == self.STAGE_MASK @@ -68,15 +68,15 @@ class TestRemoteProgress(RemoteProgress): def assert_received_message(self): assert self._num_progress_messages - + class TestRemote(TestBase): - + def _print_fetchhead(self, repo): fp = open(os.path.join(repo.git_dir, "FETCH_HEAD")) fp.close() - - + + def _do_test_fetch_result(self, results, remote): # self._print_fetchhead(remote.repo) assert len(results) > 0 and isinstance(results[0], FetchInfo) @@ -92,7 +92,7 @@ class TestRemote(TestBase): assert info.old_commit is None # END forced update checking # END for each info - + def _do_test_push_result(self, results, remote): assert len(results) > 0 and isinstance(results[0], PushInfo) for info in results: @@ -115,12 +115,12 @@ class TestRemote(TestBase): assert type(info.remote_ref) in (TagReference, RemoteReference) # END error checking # END for each info - - + + def _do_test_fetch_info(self, repo): self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '') self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') - + def _commit_random_file(self, repo): #Create a file with a random name and random data and commit it to repo. # Return the commited absolute file path @@ -129,11 +129,11 @@ class TestRemote(TestBase): index.add([new_file]) index.commit("Committing %s" % new_file) return new_file - + def _do_test_fetch(self,remote, rw_repo, remote_repo): # specialized fetch testing to de-clutter the main test self._do_test_fetch_info(rw_repo) - + def fetch_and_test(remote, **kwargs): progress = TestRemoteProgress() kwargs['progress'] = progress @@ -142,18 +142,18 @@ class TestRemote(TestBase): self._do_test_fetch_result(res, remote) return res # END fetch and check - + def get_info(res, remote, name): return res["%s/%s"%(remote,name)] - + # put remote head to master as it is garantueed to exist remote_repo.head.reference = remote_repo.heads.master - + res = fetch_and_test(remote) # all uptodate for info in res: assert info.flags & info.HEAD_UPTODATE - + # rewind remote head to trigger rejection # index must be false as remote is a bare repo rhead = remote_repo.head @@ -163,39 +163,39 @@ class TestRemote(TestBase): mkey = "%s/%s"%(remote,'master') master_info = res[mkey] assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None - + # normal fast forward - set head back to previous one rhead.commit = remote_commit res = fetch_and_test(remote) assert res[mkey].flags & FetchInfo.FAST_FORWARD - + # new remote branch new_remote_branch = Head.create(remote_repo, "new_branch") res = fetch_and_test(remote) new_branch_info = get_info(res, remote, new_remote_branch) assert new_branch_info.flags & FetchInfo.NEW_HEAD - + # remote branch rename ( causes creation of a new one locally ) new_remote_branch.rename("other_branch_name") res = fetch_and_test(remote) other_branch_info = get_info(res, remote, new_remote_branch) assert other_branch_info.ref.commit == new_branch_info.ref.commit - + # remove new branch Head.delete(new_remote_branch.repo, new_remote_branch) res = fetch_and_test(remote) # deleted remote will not be fetched self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) - + # prune stale tracking branches stale_refs = remote.stale_refs assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference) RemoteReference.delete(rw_repo, *stale_refs) - + # test single branch fetch with refspec including target remote res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote) assert len(res) == 1 and get_info(res, remote, 'master') - + # ... with respec and no target res = fetch_and_test(remote, refspec='master') assert len(res) == 1 @@ -203,26 +203,26 @@ class TestRemote(TestBase): # ... multiple refspecs res = fetch_and_test(remote, refspec=['master', 'fred']) assert len(res) == 1 - + # add new tag reference rtag = TagReference.create(remote_repo, "1.0-RV_hello.there") res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit assert tinfo.flags & tinfo.NEW_TAG - + # adjust tag commit Reference.set_object(rtag, rhead.commit.parents[0].parents[0]) res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert tinfo.commit == rtag.commit assert tinfo.flags & tinfo.TAG_UPDATE - + # delete remote tag - local one will stay TagReference.delete(remote_repo, rtag) res = fetch_and_test(remote, tags=True) self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) - + # provoke to receive actual objects to see what kind of output we have to # expect. For that we need a remote transport protocol # Create a new UN-shared repo and fetch into it after we pushed a change @@ -232,20 +232,20 @@ class TestRemote(TestBase): # as it wants local paths only ( which I can understand ) other_repo = remote_repo.clone(other_repo_dir, shared=False) remote_repo_url = "git://localhost%s"%remote_repo.git_dir - + # put origin to git-url other_origin = other_repo.remotes.origin other_origin.config_writer.set("url", remote_repo_url) # it automatically creates alternates as remote_repo is shared as well. # It will use the transport though and ignore alternates when fetching # assert not other_repo.alternates # this would fail - + # assure we are in the right state rw_repo.head.reset(remote.refs.master, working_tree=True) try: self._commit_random_file(rw_repo) remote.push(rw_repo.head.reference) - + # here I would expect to see remote-information about packing # objects and so on. Unfortunately, this does not happen # if we are redirecting the output - git explicitly checks for this @@ -254,7 +254,7 @@ class TestRemote(TestBase): finally: shutil.rmtree(other_repo_dir) # END test and cleanup - + def _assert_push_and_pull(self,remote, rw_repo, remote_repo): # push our changes lhead = rw_repo.head @@ -268,11 +268,11 @@ class TestRemote(TestBase): lhead.reference = rw_repo.create_head('master') # END master handling lhead.reset(remote.refs.master, working_tree=True) - + # push without spec should fail ( without further configuration ) # well, works nicely # self.failUnlessRaises(GitCommandError, remote.push) - + # simple file push self._commit_random_file(rw_repo) progress = TestRemoteProgress() @@ -280,24 +280,24 @@ class TestRemote(TestBase): assert isinstance(res, IterableList) self._do_test_push_result(res, remote) progress.make_assertion() - + # rejected - undo last commit lhead.reset("HEAD~1") res = remote.push(lhead.reference) assert res[0].flags & PushInfo.ERROR assert res[0].flags & PushInfo.REJECTED self._do_test_push_result(res, remote) - + # force rejected pull res = remote.push('+%s' % lhead.reference) assert res[0].flags & PushInfo.ERROR == 0 assert res[0].flags & PushInfo.FORCED_UPDATE self._do_test_push_result(res, remote) - + # invalid refspec res = remote.push("hellothere") assert len(res) == 0 - + # push new tags progress = TestRemoteProgress() to_be_updated = "my_tag.1.0RV" @@ -307,18 +307,18 @@ class TestRemote(TestBase): assert res[-1].flags & PushInfo.NEW_TAG progress.make_assertion() self._do_test_push_result(res, remote) - + # update push new tags # Rejection is default new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True) res = remote.push(tags=True) self._do_test_push_result(res, remote) assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR - + # push force this tag res = remote.push("+%s" % new_tag.path) assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE - + # delete tag - have to do it using refspec res = remote.push(":%s" % new_tag.path) self._do_test_push_result(res, remote) @@ -326,7 +326,7 @@ class TestRemote(TestBase): # Currently progress is not properly transferred, especially not using # the git daemon # progress.assert_received_message() - + # push new branch new_head = Head.create(rw_repo, "my_new_branch") progress = TestRemoteProgress() @@ -334,37 +334,37 @@ class TestRemote(TestBase): assert res[0].flags & PushInfo.NEW_HEAD progress.make_assertion() self._do_test_push_result(res, remote) - + # delete new branch on the remote end and locally res = remote.push(":%s" % new_head.path) self._do_test_push_result(res, remote) Head.delete(rw_repo, new_head) assert res[-1].flags & PushInfo.DELETED - + # --all res = remote.push(all=True) self._do_test_push_result(res, remote) - + remote.pull('master') - + # cleanup - delete created tags and branches as we are in an innerloop on # the same repository TagReference.delete(rw_repo, new_tag, other_tag) remote.push(":%s" % other_tag.path) - + @with_rw_and_rw_remote_repo('0.1.6') def test_base(self, rw_repo, remote_repo): num_remotes = 0 remote_set = set() ran_fetch_test = False - + for remote in rw_repo.remotes: num_remotes += 1 assert remote == remote assert str(remote) != repr(remote) remote_set.add(remote) remote_set.add(remote) # should already exist - + # REFS refs = remote.refs assert refs @@ -372,7 +372,7 @@ class TestRemote(TestBase): assert ref.remote_name == remote.name assert ref.remote_head # END for each ref - + # OPTIONS # cannot use 'fetch' key anymore as it is now a method for opt in ("url", ): @@ -380,10 +380,10 @@ class TestRemote(TestBase): reader = remote.config_reader assert reader.get(opt) == val assert reader.get_value(opt, None) == val - + # unable to write with a reader self.failUnlessRaises(IOError, reader.set, opt, "test") - + # change value writer = remote.config_writer new_val = "myval" @@ -394,7 +394,7 @@ class TestRemote(TestBase): del(writer) assert getattr(remote, opt) == val # END for each default option key - + # RENAME other_name = "totally_other_name" prev_name = remote.name @@ -404,10 +404,10 @@ class TestRemote(TestBase): for time in range(2): assert remote.rename(prev_name).name == prev_name # END for each rename ( back to prev_name ) - + # PUSH/PULL TESTING self._assert_push_and_pull(remote, rw_repo, remote_repo) - + # FETCH TESTING # Only for remotes - local cases are the same or less complicated # as additional progress information will never be emitted @@ -415,17 +415,17 @@ class TestRemote(TestBase): self._do_test_fetch(remote, rw_repo, remote_repo) ran_fetch_test = True # END fetch test - + remote.update() # END for each remote - + assert ran_fetch_test assert num_remotes assert num_remotes == len(remote_set) - + origin = rw_repo.remote('origin') assert origin == rw_repo.remotes.origin - + @with_rw_repo('HEAD', bare=True) def test_creation_and_removal(self, bare_rw_repo): new_name = "test_new_one" @@ -433,18 +433,18 @@ class TestRemote(TestBase): remote = Remote.create(bare_rw_repo, *arg_list ) assert remote.name == "test_new_one" assert remote in bare_rw_repo.remotes - + # create same one again self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) - + Remote.remove(bare_rw_repo, new_name) - + for remote in bare_rw_repo.remotes: if remote.name == new_name: raise AssertionError("Remote removal failed") # END if deleted remote matches existing remote's name # END for each remote - + def test_fetch_info(self): # assure we can handle remote-tracking branches fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython" @@ -454,48 +454,46 @@ class TestRemote(TestBase): fetch_info_line_fmt % 'remote-tracking branch') assert fi.ref.is_valid() assert fi.ref.commit - + # handles non-default refspecs: One can specify a different path in refs/remotes # or a special path just in refs/something for instance - + fi = FetchInfo._from_line(self.rorepo, remote_info_line_fmt % "subdir/tagname", fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path.startswith('refs/tags') - + # it could be in a remote direcftory though fi = FetchInfo._from_line(self.rorepo, remote_info_line_fmt % "remotename/tags/tagname", fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path.startswith('refs/remotes/') - + # it can also be anywhere ! tag_path = "refs/something/remotename/tags/tagname" fi = FetchInfo._from_line(self.rorepo, remote_info_line_fmt % tag_path, fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path == tag_path - + # branches default to refs/remotes fi = FetchInfo._from_line(self.rorepo, remote_info_line_fmt % "remotename/branch", fetch_info_line_fmt % 'branch') - + assert isinstance(fi.ref, RemoteReference) assert fi.ref.remote_name == 'remotename' - + # but you can force it anywhere, in which case we only have a references fi = FetchInfo._from_line(self.rorepo, remote_info_line_fmt % "refs/something/branch", fetch_info_line_fmt % 'branch') - + assert type(fi.ref) is Reference assert fi.ref.path == "refs/something/branch" - - |