diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-11-21 19:19:04 +0100 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-11-21 19:19:04 +0100 |
commit | e6019e16d5a74dc49eb7129ee7fd78b4de51dac2 (patch) | |
tree | d63ba4a29087506bb1906307f202d11e22c6979c /test/git/test_remote.py | |
parent | fafe8a77db75083de3e7af92185ecdb7f2d542d3 (diff) | |
download | gitpython-e6019e16d5a74dc49eb7129ee7fd78b4de51dac2.tar.gz |
flattened test folder structure, didn't adjust any file content yet
Diffstat (limited to 'test/git/test_remote.py')
-rw-r--r-- | test/git/test_remote.py | 445 |
1 files changed, 0 insertions, 445 deletions
diff --git a/test/git/test_remote.py b/test/git/test_remote.py deleted file mode 100644 index c52f907e..00000000 --- a/test/git/test_remote.py +++ /dev/null @@ -1,445 +0,0 @@ -# test_remote.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * -from git.util import IterableList -import tempfile -import shutil -import os -import random - -# assure we have repeatable results -random.seed(0) - -class TestRemoteProgress(RemoteProgress): - __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' ) - def __init__(self): - super(TestRemoteProgress, self).__init__() - self._seen_lines = list() - self._stages_per_op = dict() - self._num_progress_messages = 0 - - def _parse_progress_line(self, line): - # we may remove the line later if it is dropped - # Keep it for debugging - self._seen_lines.append(line) - rval = super(TestRemoteProgress, self)._parse_progress_line(line) - assert len(line) > 1, "line %r too short" % line - return rval - - def line_dropped(self, line): - try: - self._seen_lines.remove(line) - except ValueError: - pass - - def update(self, op_code, cur_count, max_count=None, message=''): - # check each stage only comes once - op_id = op_code & self.OP_MASK - assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) - - self._stages_per_op.setdefault(op_id, 0) - self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) - - if op_code & (self.WRITING|self.END) == (self.WRITING|self.END): - assert message - # END check we get message - - self._num_progress_messages += 1 - - - def make_assertion(self): - # we don't always receive messages - if not self._seen_lines: - return - - # sometimes objects are not compressed which is okay - assert len(self._seen_ops) in (2,3) - assert self._stages_per_op - - # must have seen all stages - for op, stages in self._stages_per_op.items(): - assert stages & self.STAGE_MASK == self.STAGE_MASK - # END for each op/stage - - def assert_received_message(self): - assert self._num_progress_messages - - -class TestRemote(TestBase): - - def _print_fetchhead(self, repo): - fp = open(os.path.join(repo.git_dir, "FETCH_HEAD")) - fp.close() - - - def _do_test_fetch_result(self, results, remote): - # self._print_fetchhead(remote.repo) - assert len(results) > 0 and isinstance(results[0], FetchInfo) - for info in results: - assert isinstance(info.note, basestring) - if isinstance(info.ref, Reference): - assert info.flags != 0 - # END reference type flags handling - assert isinstance(info.ref, (SymbolicReference, Reference)) - if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD): - assert isinstance(info.old_commit, Commit) - else: - assert info.old_commit is None - # END forced update checking - # END for each info - - def _do_test_push_result(self, results, remote): - assert len(results) > 0 and isinstance(results[0], PushInfo) - for info in results: - assert info.flags - assert isinstance(info.summary, basestring) - if info.old_commit is not None: - assert isinstance(info.old_commit, Commit) - if info.flags & info.ERROR: - has_one = False - for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE): - has_one |= bool(info.flags & bitflag) - # END for each bitflag - assert has_one - else: - # there must be a remote commit - if info.flags & info.DELETED == 0: - assert isinstance(info.local_ref, Reference) - else: - assert info.local_ref is None - assert type(info.remote_ref) in (TagReference, RemoteReference) - # END error checking - # END for each info - - - def _do_test_fetch_info(self, repo): - self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '') - self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') - - def _commit_random_file(self, repo): - #Create a file with a random name and random data and commit it to repo. - # Return the commited absolute file path - index = repo.index - new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo) - index.add([new_file]) - index.commit("Committing %s" % new_file) - return new_file - - def _do_test_fetch(self,remote, rw_repo, remote_repo): - # specialized fetch testing to de-clutter the main test - self._do_test_fetch_info(rw_repo) - - def fetch_and_test(remote, **kwargs): - progress = TestRemoteProgress() - kwargs['progress'] = progress - res = remote.fetch(**kwargs) - progress.make_assertion() - self._do_test_fetch_result(res, remote) - return res - # END fetch and check - - def get_info(res, remote, name): - return res["%s/%s"%(remote,name)] - - # put remote head to master as it is garantueed to exist - remote_repo.head.reference = remote_repo.heads.master - - res = fetch_and_test(remote) - # all uptodate - for info in res: - assert info.flags & info.HEAD_UPTODATE - - # rewind remote head to trigger rejection - # index must be false as remote is a bare repo - rhead = remote_repo.head - remote_commit = rhead.commit - rhead.reset("HEAD~2", index=False) - res = fetch_and_test(remote) - mkey = "%s/%s"%(remote,'master') - master_info = res[mkey] - assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None - - # normal fast forward - set head back to previous one - rhead.commit = remote_commit - res = fetch_and_test(remote) - assert res[mkey].flags & FetchInfo.FAST_FORWARD - - # new remote branch - new_remote_branch = Head.create(remote_repo, "new_branch") - res = fetch_and_test(remote) - new_branch_info = get_info(res, remote, new_remote_branch) - assert new_branch_info.flags & FetchInfo.NEW_HEAD - - # remote branch rename ( causes creation of a new one locally ) - new_remote_branch.rename("other_branch_name") - res = fetch_and_test(remote) - other_branch_info = get_info(res, remote, new_remote_branch) - assert other_branch_info.ref.commit == new_branch_info.ref.commit - - # remove new branch - Head.delete(new_remote_branch.repo, new_remote_branch) - res = fetch_and_test(remote) - # deleted remote will not be fetched - self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) - - # prune stale tracking branches - stale_refs = remote.stale_refs - assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference) - RemoteReference.delete(rw_repo, *stale_refs) - - # test single branch fetch with refspec including target remote - res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote) - assert len(res) == 1 and get_info(res, remote, 'master') - - # ... with respec and no target - res = fetch_and_test(remote, refspec='master') - assert len(res) == 1 - - # add new tag reference - rtag = TagReference.create(remote_repo, "1.0-RV_hello.there") - res = fetch_and_test(remote, tags=True) - tinfo = res[str(rtag)] - assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit - assert tinfo.flags & tinfo.NEW_TAG - - # adjust tag commit - Reference._set_object(rtag, rhead.commit.parents[0].parents[0]) - res = fetch_and_test(remote, tags=True) - tinfo = res[str(rtag)] - assert tinfo.commit == rtag.commit - assert tinfo.flags & tinfo.TAG_UPDATE - - # delete remote tag - local one will stay - TagReference.delete(remote_repo, rtag) - res = fetch_and_test(remote, tags=True) - self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) - - # provoke to receive actual objects to see what kind of output we have to - # expect. For that we need a remote transport protocol - # Create a new UN-shared repo and fetch into it after we pushed a change - # to the shared repo - other_repo_dir = tempfile.mktemp("other_repo") - # must clone with a local path for the repo implementation not to freak out - # as it wants local paths only ( which I can understand ) - other_repo = remote_repo.clone(other_repo_dir, shared=False) - remote_repo_url = "git://localhost%s"%remote_repo.git_dir - - # put origin to git-url - other_origin = other_repo.remotes.origin - other_origin.config_writer.set("url", remote_repo_url) - # it automatically creates alternates as remote_repo is shared as well. - # It will use the transport though and ignore alternates when fetching - # assert not other_repo.alternates # this would fail - - # assure we are in the right state - rw_repo.head.reset(remote.refs.master, working_tree=True) - try: - self._commit_random_file(rw_repo) - remote.push(rw_repo.head.reference) - - # here I would expect to see remote-information about packing - # objects and so on. Unfortunately, this does not happen - # if we are redirecting the output - git explicitly checks for this - # and only provides progress information to ttys - res = fetch_and_test(other_origin) - finally: - shutil.rmtree(other_repo_dir) - # END test and cleanup - - def _test_push_and_pull(self,remote, rw_repo, remote_repo): - # push our changes - lhead = rw_repo.head - lindex = rw_repo.index - # assure we are on master and it is checked out where the remote is - try: - lhead.reference = rw_repo.heads.master - except AttributeError: - # if the author is on a non-master branch, the clones might not have - # a local master yet. We simply create it - lhead.reference = rw_repo.create_head('master') - # END master handling - lhead.reset(remote.refs.master, working_tree=True) - - # push without spec should fail ( without further configuration ) - # well, works nicely - # self.failUnlessRaises(GitCommandError, remote.push) - - # simple file push - self._commit_random_file(rw_repo) - progress = TestRemoteProgress() - res = remote.push(lhead.reference, progress) - assert isinstance(res, IterableList) - self._do_test_push_result(res, remote) - progress.make_assertion() - - # rejected - undo last commit - lhead.reset("HEAD~1") - res = remote.push(lhead.reference) - assert res[0].flags & PushInfo.ERROR - assert res[0].flags & PushInfo.REJECTED - self._do_test_push_result(res, remote) - - # force rejected pull - res = remote.push('+%s' % lhead.reference) - assert res[0].flags & PushInfo.ERROR == 0 - assert res[0].flags & PushInfo.FORCED_UPDATE - self._do_test_push_result(res, remote) - - # invalid refspec - res = remote.push("hellothere") - assert len(res) == 0 - - # push new tags - progress = TestRemoteProgress() - to_be_updated = "my_tag.1.0RV" - new_tag = TagReference.create(rw_repo, to_be_updated) - other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", message="my message") - res = remote.push(progress=progress, tags=True) - assert res[-1].flags & PushInfo.NEW_TAG - progress.make_assertion() - self._do_test_push_result(res, remote) - - # update push new tags - # Rejection is default - new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True) - res = remote.push(tags=True) - self._do_test_push_result(res, remote) - assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR - - # push force this tag - res = remote.push("+%s" % new_tag.path) - assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE - - # delete tag - have to do it using refspec - res = remote.push(":%s" % new_tag.path) - self._do_test_push_result(res, remote) - assert res[0].flags & PushInfo.DELETED - # Currently progress is not properly transferred, especially not using - # the git daemon - # progress.assert_received_message() - - # push new branch - new_head = Head.create(rw_repo, "my_new_branch") - progress = TestRemoteProgress() - res = remote.push(new_head, progress) - assert res[0].flags & PushInfo.NEW_HEAD - progress.make_assertion() - self._do_test_push_result(res, remote) - - # delete new branch on the remote end and locally - res = remote.push(":%s" % new_head.path) - self._do_test_push_result(res, remote) - Head.delete(rw_repo, new_head) - assert res[-1].flags & PushInfo.DELETED - - # --all - res = remote.push(all=True) - self._do_test_push_result(res, remote) - - remote.pull('master') - - # cleanup - delete created tags and branches as we are in an innerloop on - # the same repository - TagReference.delete(rw_repo, new_tag, other_tag) - remote.push(":%s" % other_tag.path) - - @with_rw_and_rw_remote_repo('0.1.6') - def test_base(self, rw_repo, remote_repo): - num_remotes = 0 - remote_set = set() - ran_fetch_test = False - - for remote in rw_repo.remotes: - num_remotes += 1 - assert remote == remote - assert str(remote) != repr(remote) - remote_set.add(remote) - remote_set.add(remote) # should already exist - - # REFS - refs = remote.refs - assert refs - for ref in refs: - assert ref.remote_name == remote.name - assert ref.remote_head - # END for each ref - - # OPTIONS - # cannot use 'fetch' key anymore as it is now a method - for opt in ("url", ): - val = getattr(remote, opt) - reader = remote.config_reader - assert reader.get(opt) == val - assert reader.get_value(opt, None) == val - - # unable to write with a reader - self.failUnlessRaises(IOError, reader.set, opt, "test") - - # change value - writer = remote.config_writer - new_val = "myval" - writer.set(opt, new_val) - assert writer.get(opt) == new_val - writer.set(opt, val) - assert writer.get(opt) == val - del(writer) - assert getattr(remote, opt) == val - # END for each default option key - - # RENAME - other_name = "totally_other_name" - prev_name = remote.name - assert remote.rename(other_name) == remote - assert prev_name != remote.name - # multiple times - for time in range(2): - assert remote.rename(prev_name).name == prev_name - # END for each rename ( back to prev_name ) - - # PUSH/PULL TESTING - self._test_push_and_pull(remote, rw_repo, remote_repo) - - # FETCH TESTING - # Only for remotes - local cases are the same or less complicated - # as additional progress information will never be emitted - if remote.name == "daemon_origin": - self._do_test_fetch(remote, rw_repo, remote_repo) - ran_fetch_test = True - # END fetch test - - remote.update() - # END for each remote - - assert ran_fetch_test - assert num_remotes - assert num_remotes == len(remote_set) - - origin = rw_repo.remote('origin') - assert origin == rw_repo.remotes.origin - - @with_rw_repo('HEAD', bare=True) - def test_creation_and_removal(self, bare_rw_repo): - new_name = "test_new_one" - arg_list = (new_name, "git@server:hello.git") - remote = Remote.create(bare_rw_repo, *arg_list ) - assert remote.name == "test_new_one" - assert remote in bare_rw_repo.remotes - - # create same one again - self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) - - Remote.remove(bare_rw_repo, new_name) - - for remote in bare_rw_repo.remotes: - if remote.name == new_name: - raise AssertionError("Remote removal failed") - # END if deleted remote matches existing remote's name - # END for each remote - - - |