summaryrefslogtreecommitdiff
path: root/test/test_remote.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_remote.py')
-rw-r--r--test/test_remote.py218
1 files changed, 138 insertions, 80 deletions
diff --git a/test/test_remote.py b/test/test_remote.py
index 761a7a3e..53f71e3d 100644
--- a/test/test_remote.py
+++ b/test/test_remote.py
@@ -20,7 +20,7 @@ from git import (
RemoteReference,
TagReference,
Remote,
- GitCommandError
+ GitCommandError,
)
from git.cmd import Git
from test.lib import (
@@ -28,7 +28,7 @@ from test.lib import (
with_rw_repo,
with_rw_and_rw_remote_repo,
fixture,
- GIT_DAEMON_PORT
+ GIT_DAEMON_PORT,
)
from git.util import rmtree, HIDE_WINDOWS_FREEZE_ERRORS, IterableList
import os.path as osp
@@ -39,7 +39,7 @@ random.seed(0)
class TestRemoteProgress(RemoteProgress):
- __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages')
+ __slots__ = ("_seen_lines", "_stages_per_op", "_num_progress_messages")
def __init__(self):
super(TestRemoteProgress, self).__init__()
@@ -60,21 +60,27 @@ class TestRemoteProgress(RemoteProgress):
except ValueError:
pass
- def update(self, op_code, cur_count, max_count=None, message=''):
+ def update(self, op_code, cur_count, max_count=None, message=""):
# check each stage only comes once
op_id = op_code & self.OP_MASK
assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
if op_code & self.WRITING > 0:
if op_code & self.BEGIN > 0:
- assert not message, 'should not have message when remote begins writing'
+ assert not message, "should not have message when remote begins writing"
elif op_code & self.END > 0:
assert message
- assert not message.startswith(', '), "Sanitize progress messages: '%s'" % message
- assert not message.endswith(', '), "Sanitize progress messages: '%s'" % message
+ assert not message.startswith(", "), (
+ "Sanitize progress messages: '%s'" % message
+ )
+ assert not message.endswith(", "), (
+ "Sanitize progress messages: '%s'" % message
+ )
self._stages_per_op.setdefault(op_id, 0)
- self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK)
+ self._stages_per_op[op_id] = self._stages_per_op[op_id] | (
+ op_code & self.STAGE_MASK
+ )
if op_code & (self.WRITING | self.END) == (self.WRITING | self.END):
assert message
@@ -101,9 +107,9 @@ class TestRemoteProgress(RemoteProgress):
class TestRemote(TestBase):
-
def tearDown(self):
import gc
+
gc.collect()
def _print_fetchhead(self, repo):
@@ -140,7 +146,11 @@ class TestRemote(TestBase):
self.assertIsInstance(info.old_commit, Commit)
if info.flags & info.ERROR:
has_one = False
- for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE):
+ for bitflag in (
+ info.REJECTED,
+ info.REMOTE_REJECTED,
+ info.REMOTE_FAILURE,
+ ):
has_one |= bool(info.flags & bitflag)
# END for each bitflag
self.assertTrue(has_one)
@@ -161,15 +171,22 @@ class TestRemote(TestBase):
results.raise_if_error()
def _do_test_fetch_info(self, repo):
- self.assertRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '')
+ self.assertRaises(ValueError, FetchInfo._from_line, repo, "nonsense", "")
self.assertRaises(
- ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
+ ValueError,
+ FetchInfo._from_line,
+ repo,
+ "? [up to date] 0.1.7RC -> origin/0.1.7RC",
+ "",
+ )
def _commit_random_file(self, repo):
# Create a file with a random name and random data and commit it to repo.
# Return the committed absolute file path
index = repo.index
- new_file = self._make_file(osp.basename(tempfile.mktemp()), str(random.random()), repo)
+ new_file = self._make_file(
+ osp.basename(tempfile.mktemp()), str(random.random()), repo
+ )
index.add([new_file])
index.commit("Committing %s" % new_file)
return new_file
@@ -180,11 +197,12 @@ class TestRemote(TestBase):
def fetch_and_test(remote, **kwargs):
progress = TestRemoteProgress()
- kwargs['progress'] = progress
+ kwargs["progress"] = progress
res = remote.fetch(**kwargs)
progress.make_assertion()
self._do_test_fetch_result(res, remote)
return res
+
# END fetch and check
def get_info(res, remote, name):
@@ -204,7 +222,7 @@ class TestRemote(TestBase):
remote_commit = rhead.commit
rhead.reset("HEAD~2", index=False)
res = fetch_and_test(remote)
- mkey = "%s/%s" % (remote, 'master')
+ mkey = "%s/%s" % (remote, "master")
master_info = res[mkey]
self.assertTrue(master_info.flags & FetchInfo.FORCED_UPDATE)
self.assertIsNotNone(master_info.note)
@@ -241,10 +259,10 @@ class TestRemote(TestBase):
# test single branch fetch with refspec including target remote
res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote)
self.assertEqual(len(res), 1)
- self.assertTrue(get_info(res, remote, 'master'))
+ self.assertTrue(get_info(res, remote, "master"))
# ... with respec and no target
- res = fetch_and_test(remote, refspec='master')
+ res = fetch_and_test(remote, refspec="master")
self.assertEqual(len(res), 1)
# ... multiple refspecs ... works, but git command returns with error if one ref is wrong without
@@ -286,8 +304,12 @@ class TestRemote(TestBase):
# must clone with a local path for the repo implementation not to freak out
# as it wants local paths only ( which I can understand )
other_repo = remote_repo.clone(other_repo_dir, shared=False)
- remote_repo_url = osp.basename(remote_repo.git_dir) # git-daemon runs with appropriate `--base-path`.
- remote_repo_url = Git.polish_url("git://localhost:%s/%s" % (GIT_DAEMON_PORT, remote_repo_url))
+ remote_repo_url = osp.basename(
+ remote_repo.git_dir
+ ) # git-daemon runs with appropriate `--base-path`.
+ remote_repo_url = Git.polish_url(
+ "git://localhost:%s/%s" % (GIT_DAEMON_PORT, remote_repo_url)
+ )
# put origin to git-url
other_origin = other_repo.remotes.origin
@@ -321,7 +343,7 @@ class TestRemote(TestBase):
except AttributeError:
# if the author is on a non-master branch, the clones might not have
# a local master yet. We simply create it
- lhead.reference = rw_repo.create_head('master')
+ lhead.reference = rw_repo.create_head("master")
# END master handling
lhead.reset(remote.refs.master, working_tree=True)
@@ -345,7 +367,7 @@ class TestRemote(TestBase):
self._do_test_push_result(res, remote)
# force rejected pull
- res = remote.push('+%s' % lhead.reference)
+ res = remote.push("+%s" % lhead.reference)
self.assertEqual(res[0].flags & PushInfo.ERROR, 0)
self.assertTrue(res[0].flags & PushInfo.FORCED_UPDATE)
self._do_test_push_result(res, remote)
@@ -357,7 +379,9 @@ class TestRemote(TestBase):
progress = TestRemoteProgress()
to_be_updated = "my_tag.1.0RV"
new_tag = TagReference.create(rw_repo, to_be_updated) # @UnusedVariable
- other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", logmsg="my message")
+ other_tag = TagReference.create(
+ rw_repo, "my_obj_tag.2.1aRV", logmsg="my message"
+ )
res = remote.push(progress=progress, tags=True)
self.assertTrue(res[-1].flags & PushInfo.NEW_TAG)
progress.make_assertion()
@@ -365,7 +389,9 @@ class TestRemote(TestBase):
# update push new tags
# Rejection is default
- new_tag = TagReference.create(rw_repo, to_be_updated, reference='HEAD~1', force=True)
+ new_tag = TagReference.create(
+ rw_repo, to_be_updated, reference="HEAD~1", force=True
+ )
res = remote.push(tags=True)
self._do_test_push_result(res, remote)
self.assertTrue(res[-1].flags & PushInfo.REJECTED)
@@ -411,7 +437,7 @@ class TestRemote(TestBase):
res = remote.push(all=True)
self._do_test_push_result(res, remote)
- remote.pull('master', kill_after_timeout=10.0)
+ remote.pull("master", kill_after_timeout=10.0)
# cleanup - delete created tags and branches as we are in an innerloop on
# the same repository
@@ -419,7 +445,7 @@ class TestRemote(TestBase):
remote.push(":%s" % other_tag.path, kill_after_timeout=10.0)
@skipIf(HIDE_WINDOWS_FREEZE_ERRORS, "FIXME: Freezes!")
- @with_rw_and_rw_remote_repo('0.1.6')
+ @with_rw_and_rw_remote_repo("0.1.6")
def test_base(self, rw_repo, remote_repo):
num_remotes = 0
remote_set = set()
@@ -477,8 +503,9 @@ class TestRemote(TestBase):
# Only for remotes - local cases are the same or less complicated
# as additional progress information will never be emitted
if remote.name == "daemon_origin":
- self._do_test_fetch(remote, rw_repo, remote_repo,
- kill_after_timeout=10.0)
+ self._do_test_fetch(
+ remote, rw_repo, remote_repo, kill_after_timeout=10.0
+ )
ran_fetch_test = True
# END fetch test
@@ -489,7 +516,7 @@ class TestRemote(TestBase):
self.assertTrue(num_remotes)
self.assertEqual(num_remotes, len(remote_set))
- origin = rw_repo.remote('origin')
+ origin = rw_repo.remote("origin")
assert origin == rw_repo.remotes.origin
# Verify we can handle prunes when fetching
@@ -502,15 +529,19 @@ class TestRemote(TestBase):
num_deleted = False
for branch in remote_repo.heads:
- if branch.name != 'master':
+ if branch.name != "master":
branch.delete(remote_repo, branch, force=True)
num_deleted += 1
# end
# end for each branch
self.assertGreater(num_deleted, 0)
- self.assertEqual(len(rw_repo.remotes.origin.fetch(prune=True)), 1, "deleted everything but master")
+ self.assertEqual(
+ len(rw_repo.remotes.origin.fetch(prune=True)),
+ 1,
+ "deleted everything but master",
+ )
- @with_rw_repo('HEAD', bare=True)
+ @with_rw_repo("HEAD", bare=True)
def test_creation_and_removal(self, bare_rw_repo):
new_name = "test_new_one"
arg_list = (new_name, "git@server:hello.git")
@@ -523,7 +554,9 @@ class TestRemote(TestBase):
self.assertRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
Remote.remove(bare_rw_repo, new_name)
- self.assertTrue(remote.exists()) # We still have a cache that doesn't know we were deleted by name
+ self.assertTrue(
+ remote.exists()
+ ) # We still have a cache that doesn't know we were deleted by name
remote._clear_cache()
assert not remote.exists() # Cache should be renewed now. This is an issue ...
@@ -534,86 +567,108 @@ class TestRemote(TestBase):
# END for each remote
# Issue #262 - the next call would fail if bug wasn't fixed
- bare_rw_repo.create_remote('bogus', '/bogus/path', mirror='push')
+ bare_rw_repo.create_remote("bogus", "/bogus/path", mirror="push")
def test_fetch_info(self):
# assure we can handle remote-tracking branches
- fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of "
+ fetch_info_line_fmt = (
+ "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of "
+ )
fetch_info_line_fmt += "git://github.com/gitpython-developers/GitPython"
remote_info_line_fmt = "* [new branch] nomatter -> %s"
- self.assertRaises(ValueError, FetchInfo._from_line, self.rorepo,
- remote_info_line_fmt % "refs/something/branch",
- "269c498e56feb93e408ed4558c8138d750de8893\t\t/Users/ben/test/foo\n")
-
- fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "local/master",
- fetch_info_line_fmt % 'remote-tracking branch')
+ self.assertRaises(
+ ValueError,
+ FetchInfo._from_line,
+ self.rorepo,
+ remote_info_line_fmt % "refs/something/branch",
+ "269c498e56feb93e408ed4558c8138d750de8893\t\t/Users/ben/test/foo\n",
+ )
+
+ fi = FetchInfo._from_line(
+ self.rorepo,
+ remote_info_line_fmt % "local/master",
+ fetch_info_line_fmt % "remote-tracking branch",
+ )
assert not fi.ref.is_valid()
self.assertEqual(fi.ref.name, "local/master")
# handles non-default refspecs: One can specify a different path in refs/remotes
# or a special path just in refs/something for instance
- fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "subdir/tagname",
- fetch_info_line_fmt % 'tag')
+ fi = FetchInfo._from_line(
+ self.rorepo,
+ remote_info_line_fmt % "subdir/tagname",
+ fetch_info_line_fmt % "tag",
+ )
self.assertIsInstance(fi.ref, TagReference)
- assert fi.ref.path.startswith('refs/tags'), fi.ref.path
+ assert fi.ref.path.startswith("refs/tags"), fi.ref.path
# it could be in a remote direcftory though
- fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/tags/tagname",
- fetch_info_line_fmt % 'tag')
+ fi = FetchInfo._from_line(
+ self.rorepo,
+ remote_info_line_fmt % "remotename/tags/tagname",
+ fetch_info_line_fmt % "tag",
+ )
self.assertIsInstance(fi.ref, TagReference)
- assert fi.ref.path.startswith('refs/remotes/'), fi.ref.path
+ assert fi.ref.path.startswith("refs/remotes/"), fi.ref.path
# it can also be anywhere !
tag_path = "refs/something/remotename/tags/tagname"
- fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % tag_path,
- fetch_info_line_fmt % 'tag')
+ fi = FetchInfo._from_line(
+ self.rorepo, remote_info_line_fmt % tag_path, fetch_info_line_fmt % "tag"
+ )
self.assertIsInstance(fi.ref, TagReference)
self.assertEqual(fi.ref.path, tag_path)
# branches default to refs/remotes
- fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/branch",
- fetch_info_line_fmt % 'branch')
+ fi = FetchInfo._from_line(
+ self.rorepo,
+ remote_info_line_fmt % "remotename/branch",
+ fetch_info_line_fmt % "branch",
+ )
self.assertIsInstance(fi.ref, RemoteReference)
- self.assertEqual(fi.ref.remote_name, 'remotename')
+ self.assertEqual(fi.ref.remote_name, "remotename")
# but you can force it anywhere, in which case we only have a references
- fi = FetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "refs/something/branch",
- fetch_info_line_fmt % 'branch')
+ fi = FetchInfo._from_line(
+ self.rorepo,
+ remote_info_line_fmt % "refs/something/branch",
+ fetch_info_line_fmt % "branch",
+ )
assert type(fi.ref) is Reference, type(fi.ref)
self.assertEqual(fi.ref.path, "refs/something/branch")
def test_uncommon_branch_names(self):
- stderr_lines = fixture('uncommon_branch_prefix_stderr').decode('ascii').splitlines()
- fetch_lines = fixture('uncommon_branch_prefix_FETCH_HEAD').decode('ascii').splitlines()
+ stderr_lines = (
+ fixture("uncommon_branch_prefix_stderr").decode("ascii").splitlines()
+ )
+ fetch_lines = (
+ fixture("uncommon_branch_prefix_FETCH_HEAD").decode("ascii").splitlines()
+ )
# The contents of the files above must be fetched with a custom refspec:
# +refs/pull/*:refs/heads/pull/*
- res = [FetchInfo._from_line('ShouldntMatterRepo', stderr, fetch_line)
- for stderr, fetch_line in zip(stderr_lines, fetch_lines)]
+ res = [
+ FetchInfo._from_line("ShouldntMatterRepo", stderr, fetch_line)
+ for stderr, fetch_line in zip(stderr_lines, fetch_lines)
+ ]
self.assertGreater(len(res), 0)
- self.assertEqual(res[0].remote_ref_path, 'refs/pull/1/head')
- self.assertEqual(res[0].ref.path, 'refs/heads/pull/1/head')
+ self.assertEqual(res[0].remote_ref_path, "refs/pull/1/head")
+ self.assertEqual(res[0].ref.path, "refs/heads/pull/1/head")
self.assertIsInstance(res[0].ref, Head)
- @with_rw_repo('HEAD', bare=False)
+ @with_rw_repo("HEAD", bare=False)
def test_multiple_urls(self, rw_repo):
# test addresses
- test1 = 'https://github.com/gitpython-developers/GitPython'
- test2 = 'https://github.com/gitpython-developers/gitdb'
- test3 = 'https://github.com/gitpython-developers/smmap'
+ test1 = "https://github.com/gitpython-developers/GitPython"
+ test2 = "https://github.com/gitpython-developers/gitdb"
+ test3 = "https://github.com/gitpython-developers/smmap"
remote = rw_repo.remotes[0]
# Testing setting a single URL
@@ -639,7 +694,7 @@ class TestRemote(TestBase):
self.assertRaises(GitCommandError, remote.set_url, test2, add=True, delete=True)
# Testing on another remote, with the add/delete URL
- remote = rw_repo.create_remote('another', url=test1)
+ remote = rw_repo.create_remote("another", url=test1)
remote.add_url(test2)
self.assertEqual(list(remote.urls), [test1, test2])
remote.add_url(test3)
@@ -653,19 +708,23 @@ class TestRemote(TestBase):
self.assertRaises(GitCommandError, remote.delete_url, test3)
def test_fetch_error(self):
- rem = self.rorepo.remote('origin')
- with self.assertRaisesRegex(GitCommandError, "[Cc]ouldn't find remote ref __BAD_REF__"):
- rem.fetch('__BAD_REF__')
+ rem = self.rorepo.remote("origin")
+ with self.assertRaisesRegex(
+ GitCommandError, "[Cc]ouldn't find remote ref __BAD_REF__"
+ ):
+ rem.fetch("__BAD_REF__")
- @with_rw_repo('0.1.6', bare=False)
+ @with_rw_repo("0.1.6", bare=False)
def test_push_error(self, repo):
- rem = repo.remote('origin')
- with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"):
- rem.push('__BAD_REF__')
+ rem = repo.remote("origin")
+ with self.assertRaisesRegex(
+ GitCommandError, "src refspec __BAD_REF__ does not match any"
+ ):
+ rem.push("__BAD_REF__")
class TestTimeouts(TestBase):
- @with_rw_repo('HEAD', bare=False)
+ @with_rw_repo("HEAD", bare=False)
def test_timeout_funcs(self, repo):
# Force error code to prevent a race condition if the python thread is
# slow
@@ -675,8 +734,7 @@ class TestTimeouts(TestBase):
f = getattr(repo.remotes.origin, function)
assert f is not None # Make sure these functions exist
_ = f() # Make sure the function runs
- with pytest.raises(GitCommandError,
- match="kill_after_timeout=0 s"):
+ with pytest.raises(GitCommandError, match="kill_after_timeout=0 s"):
f(kill_after_timeout=0)
Git.AutoInterrupt._status_code_if_terminate = default