summaryrefslogtreecommitdiff
path: root/git/test/test_remote.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/test/test_remote.py')
-rw-r--r--git/test/test_remote.py228
1 files changed, 112 insertions, 116 deletions
diff --git a/git/test/test_remote.py b/git/test/test_remote.py
index 87fcd7fe..18cfda07 100644
--- a/git/test/test_remote.py
+++ b/git/test/test_remote.py
@@ -5,21 +5,21 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from git.test.lib import (
- TestBase,
- with_rw_and_rw_remote_repo,
- with_rw_repo,
- )
+ TestBase,
+ with_rw_and_rw_remote_repo,
+ with_rw_repo,
+)
from git.util import IterableList
from git.db.interface import PushInfo, FetchInfo, RemoteProgress
from git.remote import *
-from git.exc import GitCommandError
+from git.exc import GitCommandError
from git.refs import (
- Reference,
- TagReference,
- RemoteReference,
- Head,
- SymbolicReference
- )
+ Reference,
+ TagReference,
+ RemoteReference,
+ Head,
+ SymbolicReference
+)
from nose import SkipTest
@@ -28,54 +28,55 @@ import shutil
import os
import random
-# assure we have repeatable results
+# assure we have repeatable results
random.seed(0)
+
class TestRemoteProgress(RemoteProgress):
- __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages')
+ __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages')
+
def __init__(self):
super(TestRemoteProgress, self).__init__()
self._seen_lines = list()
self._stages_per_op = dict()
self._seen_ops = set()
self._num_progress_messages = 0
-
+
def line_dropped(self, line):
try:
self._seen_lines.remove(line)
except ValueError:
pass
-
+
def __call__(self, message, input=''):
pass
-
+
def update(self, op_code, cur_count, max_count=None, message='', input=''):
# check each stage only comes once
if input:
self._seen_lines.append(input)
- #END handle input
+ # END handle input
op_id = op_code & self.OP_MASK
assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
-
+
self._stages_per_op.setdefault(op_id, 0)
- self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK)
-
- if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):
+ self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK)
+
+ if op_code & (self.WRITING | self.END) == (self.WRITING | self.END):
assert message
# END check we get message
-
+
self._num_progress_messages += 1
-
-
+
def make_assertion(self):
# we don't always receive messages
if not self._seen_lines:
return
-
+
# sometimes objects are not compressed which is okay
- assert len(self._stages_per_op.keys()) in (2,3)
+ assert len(self._stages_per_op.keys()) in (2, 3)
assert self._stages_per_op
-
+
# must have seen all stages
for op, stages in self._stages_per_op.items():
assert stages & self.STAGE_MASK == self.STAGE_MASK
@@ -83,15 +84,14 @@ class TestRemoteProgress(RemoteProgress):
def assert_received_message(self):
assert self._num_progress_messages
-
+
class TestRemote(TestBase):
-
+
def _print_fetchhead(self, repo):
fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))
fp.close()
-
-
+
def _do_test_fetch_result(self, results, remote):
# self._print_fetchhead(remote.repo)
assert len(results) > 0 and isinstance(results[0], FetchInfo)
@@ -99,15 +99,15 @@ class TestRemote(TestBase):
assert isinstance(info.note, basestring)
if isinstance(info.ref, Reference):
assert info.flags != 0
- # END reference type flags handling
+ # END reference type flags handling
assert isinstance(info.ref, (SymbolicReference, Reference))
- if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD):
+ if info.flags & (info.FORCED_UPDATE | info.FAST_FORWARD):
assert isinstance(info.old_commit_binsha, str) and len(info.old_commit_binsha) == 20
else:
assert info.old_commit_binsha is None
- # END forced update checking
+ # END forced update checking
# END for each info
-
+
def _do_test_push_result(self, results, remote):
assert len(results) > 0 and isinstance(results[0], PushInfo)
for info in results:
@@ -123,24 +123,24 @@ class TestRemote(TestBase):
assert has_one
else:
# there must be a remote commit
- if info.flags & info.DELETED == 0:
+ if info.flags & info.DELETED == 0:
assert isinstance(info.local_ref, Reference)
else:
assert info.local_ref is None
assert type(info.remote_ref) in (TagReference, RemoteReference)
# END error checking
- # END for each info
-
+ # END for each info
+
def _commit_random_file(self, repo):
- #Create a file with a random name and random data and commit it to repo.
+ # Create a file with a random name and random data and commit it to repo.
# Return the commited absolute file path
index = repo.index
- new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo)
+ new_file = self._make_file(os.path.basename(tempfile.mktemp()), str(random.random()), repo)
index.add([new_file])
index.commit("Committing %s" % new_file)
return new_file
-
- def _do_test_fetch(self,remote, rw_repo, remote_repo):
+
+ def _do_test_fetch(self, remote, rw_repo, remote_repo):
def fetch_and_test(remote, **kwargs):
progress = TestRemoteProgress()
kwargs['progress'] = progress
@@ -149,84 +149,84 @@ class TestRemote(TestBase):
self._do_test_fetch_result(res, remote)
return res
# END fetch and check
-
+
def get_info(res, remote, name):
- return res["%s/%s"%(remote,name)]
-
+ return res["%s/%s" % (remote, name)]
+
# put remote head to master as it is garantueed to exist
remote_repo.head.reference = remote_repo.heads.master
-
+
res = fetch_and_test(remote)
# all uptodate
for info in res:
assert info.flags & info.HEAD_UPTODATE
-
+
# rewind remote head to trigger rejection
# index must be false as remote is a bare repo
rhead = remote_repo.head
remote_commit = rhead.commit
rhead.reset("HEAD~2", index=False)
res = fetch_and_test(remote)
- mkey = "%s/%s"%(remote,'master')
+ mkey = "%s/%s" % (remote, 'master')
master_info = res[mkey]
assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None
-
+
# normal fast forward - set head back to previous one
rhead.commit = remote_commit
res = fetch_and_test(remote)
assert res[mkey].flags & FetchInfo.FAST_FORWARD
-
+
# new remote branch
new_remote_branch = Head.create(remote_repo, "new_branch")
res = fetch_and_test(remote)
new_branch_info = get_info(res, remote, new_remote_branch)
assert new_branch_info.flags & FetchInfo.NEW_HEAD
-
+
# remote branch rename ( causes creation of a new one locally )
new_remote_branch.rename("other_branch_name")
res = fetch_and_test(remote)
other_branch_info = get_info(res, remote, new_remote_branch)
assert other_branch_info.ref.commit == new_branch_info.ref.commit
-
+
# remove new branch
Head.delete(new_remote_branch.repo, new_remote_branch)
res = fetch_and_test(remote)
# deleted remote will not be fetched
self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch)
-
+
# prune stale tracking branches
stale_refs = remote.stale_refs
assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)
RemoteReference.delete(rw_repo, *stale_refs)
-
+
# test single branch fetch with refspec including target remote
- res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)
+ res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote)
assert len(res) == 1 and get_info(res, remote, 'master')
-
+
# ... with respec and no target
res = fetch_and_test(remote, refspec='master')
assert len(res) == 1
-
+
# add new tag reference
rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")
res = fetch_and_test(remote, tags=True)
tinfo = res[str(rtag)]
assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit
assert tinfo.flags & tinfo.NEW_TAG
-
+
# adjust tag commit
Reference.set_object(rtag, rhead.commit.parents[0].parents[0])
res = fetch_and_test(remote, tags=True)
tinfo = res[str(rtag)]
assert tinfo.commit == rtag.commit
assert tinfo.flags & tinfo.TAG_UPDATE
-
+
# delete remote tag - local one will stay
TagReference.delete(remote_repo, rtag)
res = fetch_and_test(remote, tags=True)
self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag))
-
- # provoke to receive actual objects to see what kind of output we have to
+
+ # provoke to receive actual objects to see what kind of output we have to
# expect. For that we need a remote transport protocol
# Create a new UN-shared repo and fetch into it after we pushed a change
# to the shared repo
@@ -234,31 +234,31 @@ class TestRemote(TestBase):
# must clone with a local path for the repo implementation not to freak out
# as it wants local paths only ( which I can understand )
other_repo = remote_repo.clone(other_repo_dir, shared=False)
- remote_repo_url = "git://localhost%s"%remote_repo.git_dir
-
+ remote_repo_url = "git://localhost%s" % remote_repo.git_dir
+
# put origin to git-url
- other_origin = other_repo.remotes.origin
+ other_origin = other_repo.remotes.origin
other_origin.config_writer.set("url", remote_repo_url)
# it automatically creates alternates as remote_repo is shared as well.
# It will use the transport though and ignore alternates when fetching
# assert not other_repo.alternates # this would fail
-
+
# assure we are in the right state
rw_repo.head.reset(remote.refs.master, working_tree=True)
try:
self._commit_random_file(rw_repo)
remote.push(rw_repo.head.reference)
-
- # here I would expect to see remote-information about packing
- # objects and so on. Unfortunately, this does not happen
+
+ # here I would expect to see remote-information about packing
+ # objects and so on. Unfortunately, this does not happen
# if we are redirecting the output - git explicitly checks for this
# and only provides progress information to ttys
res = fetch_and_test(other_origin)
finally:
shutil.rmtree(other_repo_dir)
# END test and cleanup
-
- def _verify_push_and_pull(self,remote, rw_repo, remote_repo):
+
+ def _verify_push_and_pull(self, remote, rw_repo, remote_repo):
# push our changes
lhead = rw_repo.head
lindex = rw_repo.index
@@ -266,16 +266,16 @@ class TestRemote(TestBase):
try:
lhead.reference = rw_repo.heads.master
except AttributeError:
- # if the author is on a non-master branch, the clones might not have
+ # if the author is on a non-master branch, the clones might not have
# a local master yet. We simply create it
lhead.reference = rw_repo.create_head('master')
- # END master handling
+ # END master handling
lhead.reset(remote.refs.master, working_tree=True)
-
+
# push without spec should fail ( without further configuration )
# well, works nicely
# self.failUnlessRaises(GitCommandError, remote.push)
-
+
# simple file push
self._commit_random_file(rw_repo)
progress = TestRemoteProgress()
@@ -283,25 +283,25 @@ class TestRemote(TestBase):
assert isinstance(res, IterableList)
self._do_test_push_result(res, remote)
progress.make_assertion()
-
+
# rejected - undo last commit
lhead.reset("HEAD~1")
res = remote.push(lhead.reference)
- assert res[0].flags & PushInfo.ERROR
+ assert res[0].flags & PushInfo.ERROR
assert res[0].flags & PushInfo.REJECTED
self._do_test_push_result(res, remote)
-
+
# force rejected pull
res = remote.push('+%s' % lhead.reference)
- assert res[0].flags & PushInfo.ERROR == 0
+ assert res[0].flags & PushInfo.ERROR == 0
assert res[0].flags & PushInfo.FORCED_UPDATE
self._do_test_push_result(res, remote)
-
+
# invalid refspec
res = remote.push("hellothere")
assert len(res) == 0
-
- # push new tags
+
+ # push new tags
progress = TestRemoteProgress()
to_be_updated = "my_tag.1.0RV"
new_tag = TagReference.create(rw_repo, to_be_updated)
@@ -310,26 +310,26 @@ class TestRemote(TestBase):
assert res[-1].flags & PushInfo.NEW_TAG
progress.make_assertion()
self._do_test_push_result(res, remote)
-
+
# update push new tags
# Rejection is default
new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)
res = remote.push(tags=True)
self._do_test_push_result(res, remote)
assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR
-
+
# push force this tag
res = remote.push("+%s" % new_tag.path)
assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE
-
+
# delete tag - have to do it using refspec
res = remote.push(":%s" % new_tag.path)
self._do_test_push_result(res, remote)
assert res[0].flags & PushInfo.DELETED
- # Currently progress is not properly transferred, especially not using
+ # Currently progress is not properly transferred, especially not using
# the git daemon
# progress.assert_received_message()
-
+
# push new branch
new_head = Head.create(rw_repo, "my_new_branch")
progress = TestRemoteProgress()
@@ -337,20 +337,20 @@ class TestRemote(TestBase):
assert res[0].flags & PushInfo.NEW_HEAD
progress.make_assertion()
self._do_test_push_result(res, remote)
-
+
# delete new branch on the remote end and locally
res = remote.push(":%s" % new_head.path)
self._do_test_push_result(res, remote)
Head.delete(rw_repo, new_head)
assert res[-1].flags & PushInfo.DELETED
-
+
# --all
res = remote.push(all=True)
self._do_test_push_result(res, remote)
-
+
remote.pull('master')
-
- # cleanup - delete created tags and branches as we are in an innerloop on
+
+ # cleanup - delete created tags and branches as we are in an innerloop on
# the same repository
TagReference.delete(rw_repo, new_tag, other_tag)
remote.push(":%s" % other_tag.path)
@@ -359,29 +359,28 @@ class TestRemote(TestBase):
# If you see this, plesase remind yourself, that all this needs to be run
# per repository type !
raise SkipTest("todo")
-
@with_rw_and_rw_remote_repo('0.1.6')
def test_base(self, rw_repo, remote_repo):
num_remotes = 0
remote_set = set()
ran_fetch_test = False
-
+
for remote in rw_repo.remotes:
num_remotes += 1
assert remote == remote
assert str(remote) != repr(remote)
remote_set.add(remote)
remote_set.add(remote) # should already exist
-
- # REFS
+
+ # REFS
refs = remote.refs
assert refs
for ref in refs:
assert ref.remote_name == remote.name
assert ref.remote_head
# END for each ref
-
+
# OPTIONS
# cannot use 'fetch' key anymore as it is now a method
for opt in ("url", ):
@@ -389,10 +388,10 @@ class TestRemote(TestBase):
reader = remote.config_reader
assert reader.get(opt) == val
assert reader.get_value(opt, None) == val
-
+
# unable to write with a reader
self.failUnlessRaises(IOError, reader.set, opt, "test")
-
+
# change value
writer = remote.config_writer
new_val = "myval"
@@ -402,9 +401,9 @@ class TestRemote(TestBase):
assert writer.get(opt) == val
del(writer)
assert getattr(remote, opt) == val
- # END for each default option key
-
- # RENAME
+ # END for each default option key
+
+ # RENAME
other_name = "totally_other_name"
prev_name = remote.name
assert remote.rename(other_name) == remote
@@ -413,46 +412,43 @@ class TestRemote(TestBase):
for time in range(2):
assert remote.rename(prev_name).name == prev_name
# END for each rename ( back to prev_name )
-
+
# PUSH/PULL TESTING
self._verify_push_and_pull(remote, rw_repo, remote_repo)
-
+
# FETCH TESTING
- # Only for remotes - local cases are the same or less complicated
+ # Only for remotes - local cases are the same or less complicated
# as additional progress information will never be emitted
if remote.name == "daemon_origin":
self._do_test_fetch(remote, rw_repo, remote_repo)
ran_fetch_test = True
- # END fetch test
-
+ # END fetch test
+
remote.update()
# END for each remote
-
+
assert ran_fetch_test
assert num_remotes
assert num_remotes == len(remote_set)
-
+
origin = rw_repo.remote('origin')
assert origin == rw_repo.remotes.origin
-
+
@with_rw_repo('HEAD', bare=True)
def test_creation_and_removal(self, bare_rw_repo):
new_name = "test_new_one"
arg_list = (new_name, "git@server:hello.git")
- remote = Remote.create(bare_rw_repo, *arg_list )
+ remote = Remote.create(bare_rw_repo, *arg_list)
assert remote.name == "test_new_one"
assert remote in bare_rw_repo.remotes
-
+
# create same one again
self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
-
+
Remote.remove(bare_rw_repo, new_name)
-
+
for remote in bare_rw_repo.remotes:
if remote.name == new_name:
raise AssertionError("Remote removal failed")
# END if deleted remote matches existing remote's name
# END for each remote
-
-
-