diff options
Diffstat (limited to 'test/test_remote.py')
-rw-r--r-- | test/test_remote.py | 218 |
1 files changed, 138 insertions, 80 deletions
diff --git a/test/test_remote.py b/test/test_remote.py index 761a7a3e..53f71e3d 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -20,7 +20,7 @@ from git import ( RemoteReference, TagReference, Remote, - GitCommandError + GitCommandError, ) from git.cmd import Git from test.lib import ( @@ -28,7 +28,7 @@ from test.lib import ( with_rw_repo, with_rw_and_rw_remote_repo, fixture, - GIT_DAEMON_PORT + GIT_DAEMON_PORT, ) from git.util import rmtree, HIDE_WINDOWS_FREEZE_ERRORS, IterableList import os.path as osp @@ -39,7 +39,7 @@ random.seed(0) class TestRemoteProgress(RemoteProgress): - __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages') + __slots__ = ("_seen_lines", "_stages_per_op", "_num_progress_messages") def __init__(self): super(TestRemoteProgress, self).__init__() @@ -60,21 +60,27 @@ class TestRemoteProgress(RemoteProgress): except ValueError: pass - def update(self, op_code, cur_count, max_count=None, message=''): + def update(self, op_code, cur_count, max_count=None, message=""): # check each stage only comes once op_id = op_code & self.OP_MASK assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) if op_code & self.WRITING > 0: if op_code & self.BEGIN > 0: - assert not message, 'should not have message when remote begins writing' + assert not message, "should not have message when remote begins writing" elif op_code & self.END > 0: assert message - assert not message.startswith(', '), "Sanitize progress messages: '%s'" % message - assert not message.endswith(', '), "Sanitize progress messages: '%s'" % message + assert not message.startswith(", "), ( + "Sanitize progress messages: '%s'" % message + ) + assert not message.endswith(", "), ( + "Sanitize progress messages: '%s'" % message + ) self._stages_per_op.setdefault(op_id, 0) - self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK) + self._stages_per_op[op_id] = self._stages_per_op[op_id] | ( + op_code & self.STAGE_MASK + ) if op_code & (self.WRITING | self.END) == (self.WRITING | self.END): assert message @@ -101,9 +107,9 @@ class TestRemoteProgress(RemoteProgress): class TestRemote(TestBase): - def tearDown(self): import gc + gc.collect() def _print_fetchhead(self, repo): @@ -140,7 +146,11 @@ class TestRemote(TestBase): self.assertIsInstance(info.old_commit, Commit) if info.flags & info.ERROR: has_one = False - for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE): + for bitflag in ( + info.REJECTED, + info.REMOTE_REJECTED, + info.REMOTE_FAILURE, + ): has_one |= bool(info.flags & bitflag) # END for each bitflag self.assertTrue(has_one) @@ -161,15 +171,22 @@ class TestRemote(TestBase): results.raise_if_error() def _do_test_fetch_info(self, repo): - self.assertRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '') + self.assertRaises(ValueError, FetchInfo._from_line, repo, "nonsense", "") self.assertRaises( - ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') + ValueError, + FetchInfo._from_line, + repo, + "? [up to date] 0.1.7RC -> origin/0.1.7RC", + "", + ) def _commit_random_file(self, repo): # Create a file with a random name and random data and commit it to repo. # Return the committed absolute file path index = repo.index - new_file = self._make_file(osp.basename(tempfile.mktemp()), str(random.random()), repo) + new_file = self._make_file( + osp.basename(tempfile.mktemp()), str(random.random()), repo + ) index.add([new_file]) index.commit("Committing %s" % new_file) return new_file @@ -180,11 +197,12 @@ class TestRemote(TestBase): def fetch_and_test(remote, **kwargs): progress = TestRemoteProgress() - kwargs['progress'] = progress + kwargs["progress"] = progress res = remote.fetch(**kwargs) progress.make_assertion() self._do_test_fetch_result(res, remote) return res + # END fetch and check def get_info(res, remote, name): @@ -204,7 +222,7 @@ class TestRemote(TestBase): remote_commit = rhead.commit rhead.reset("HEAD~2", index=False) res = fetch_and_test(remote) - mkey = "%s/%s" % (remote, 'master') + mkey = "%s/%s" % (remote, "master") master_info = res[mkey] self.assertTrue(master_info.flags & FetchInfo.FORCED_UPDATE) self.assertIsNotNone(master_info.note) @@ -241,10 +259,10 @@ class TestRemote(TestBase): # test single branch fetch with refspec including target remote res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote) self.assertEqual(len(res), 1) - self.assertTrue(get_info(res, remote, 'master')) + self.assertTrue(get_info(res, remote, "master")) # ... with respec and no target - res = fetch_and_test(remote, refspec='master') + res = fetch_and_test(remote, refspec="master") self.assertEqual(len(res), 1) # ... multiple refspecs ... works, but git command returns with error if one ref is wrong without @@ -286,8 +304,12 @@ class TestRemote(TestBase): # must clone with a local path for the repo implementation not to freak out # as it wants local paths only ( which I can understand ) other_repo = remote_repo.clone(other_repo_dir, shared=False) - remote_repo_url = osp.basename(remote_repo.git_dir) # git-daemon runs with appropriate `--base-path`. - remote_repo_url = Git.polish_url("git://localhost:%s/%s" % (GIT_DAEMON_PORT, remote_repo_url)) + remote_repo_url = osp.basename( + remote_repo.git_dir + ) # git-daemon runs with appropriate `--base-path`. + remote_repo_url = Git.polish_url( + "git://localhost:%s/%s" % (GIT_DAEMON_PORT, remote_repo_url) + ) # put origin to git-url other_origin = other_repo.remotes.origin @@ -321,7 +343,7 @@ class TestRemote(TestBase): except AttributeError: # if the author is on a non-master branch, the clones might not have # a local master yet. We simply create it - lhead.reference = rw_repo.create_head('master') + lhead.reference = rw_repo.create_head("master") # END master handling lhead.reset(remote.refs.master, working_tree=True) @@ -345,7 +367,7 @@ class TestRemote(TestBase): self._do_test_push_result(res, remote) # force rejected pull - res = remote.push('+%s' % lhead.reference) + res = remote.push("+%s" % lhead.reference) self.assertEqual(res[0].flags & PushInfo.ERROR, 0) self.assertTrue(res[0].flags & PushInfo.FORCED_UPDATE) self._do_test_push_result(res, remote) @@ -357,7 +379,9 @@ class TestRemote(TestBase): progress = TestRemoteProgress() to_be_updated = "my_tag.1.0RV" new_tag = TagReference.create(rw_repo, to_be_updated) # @UnusedVariable - other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", logmsg="my message") + other_tag = TagReference.create( + rw_repo, "my_obj_tag.2.1aRV", logmsg="my message" + ) res = remote.push(progress=progress, tags=True) self.assertTrue(res[-1].flags & PushInfo.NEW_TAG) progress.make_assertion() @@ -365,7 +389,9 @@ class TestRemote(TestBase): # update push new tags # Rejection is default - new_tag = TagReference.create(rw_repo, to_be_updated, reference='HEAD~1', force=True) + new_tag = TagReference.create( + rw_repo, to_be_updated, reference="HEAD~1", force=True + ) res = remote.push(tags=True) self._do_test_push_result(res, remote) self.assertTrue(res[-1].flags & PushInfo.REJECTED) @@ -411,7 +437,7 @@ class TestRemote(TestBase): res = remote.push(all=True) self._do_test_push_result(res, remote) - remote.pull('master', kill_after_timeout=10.0) + remote.pull("master", kill_after_timeout=10.0) # cleanup - delete created tags and branches as we are in an innerloop on # the same repository @@ -419,7 +445,7 @@ class TestRemote(TestBase): remote.push(":%s" % other_tag.path, kill_after_timeout=10.0) @skipIf(HIDE_WINDOWS_FREEZE_ERRORS, "FIXME: Freezes!") - @with_rw_and_rw_remote_repo('0.1.6') + @with_rw_and_rw_remote_repo("0.1.6") def test_base(self, rw_repo, remote_repo): num_remotes = 0 remote_set = set() @@ -477,8 +503,9 @@ class TestRemote(TestBase): # Only for remotes - local cases are the same or less complicated # as additional progress information will never be emitted if remote.name == "daemon_origin": - self._do_test_fetch(remote, rw_repo, remote_repo, - kill_after_timeout=10.0) + self._do_test_fetch( + remote, rw_repo, remote_repo, kill_after_timeout=10.0 + ) ran_fetch_test = True # END fetch test @@ -489,7 +516,7 @@ class TestRemote(TestBase): self.assertTrue(num_remotes) self.assertEqual(num_remotes, len(remote_set)) - origin = rw_repo.remote('origin') + origin = rw_repo.remote("origin") assert origin == rw_repo.remotes.origin # Verify we can handle prunes when fetching @@ -502,15 +529,19 @@ class TestRemote(TestBase): num_deleted = False for branch in remote_repo.heads: - if branch.name != 'master': + if branch.name != "master": branch.delete(remote_repo, branch, force=True) num_deleted += 1 # end # end for each branch self.assertGreater(num_deleted, 0) - self.assertEqual(len(rw_repo.remotes.origin.fetch(prune=True)), 1, "deleted everything but master") + self.assertEqual( + len(rw_repo.remotes.origin.fetch(prune=True)), + 1, + "deleted everything but master", + ) - @with_rw_repo('HEAD', bare=True) + @with_rw_repo("HEAD", bare=True) def test_creation_and_removal(self, bare_rw_repo): new_name = "test_new_one" arg_list = (new_name, "git@server:hello.git") @@ -523,7 +554,9 @@ class TestRemote(TestBase): self.assertRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) Remote.remove(bare_rw_repo, new_name) - self.assertTrue(remote.exists()) # We still have a cache that doesn't know we were deleted by name + self.assertTrue( + remote.exists() + ) # We still have a cache that doesn't know we were deleted by name remote._clear_cache() assert not remote.exists() # Cache should be renewed now. This is an issue ... @@ -534,86 +567,108 @@ class TestRemote(TestBase): # END for each remote # Issue #262 - the next call would fail if bug wasn't fixed - bare_rw_repo.create_remote('bogus', '/bogus/path', mirror='push') + bare_rw_repo.create_remote("bogus", "/bogus/path", mirror="push") def test_fetch_info(self): # assure we can handle remote-tracking branches - fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of " + fetch_info_line_fmt = ( + "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of " + ) fetch_info_line_fmt += "git://github.com/gitpython-developers/GitPython" remote_info_line_fmt = "* [new branch] nomatter -> %s" - self.assertRaises(ValueError, FetchInfo._from_line, self.rorepo, - remote_info_line_fmt % "refs/something/branch", - "269c498e56feb93e408ed4558c8138d750de8893\t\t/Users/ben/test/foo\n") - - fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "local/master", - fetch_info_line_fmt % 'remote-tracking branch') + self.assertRaises( + ValueError, + FetchInfo._from_line, + self.rorepo, + remote_info_line_fmt % "refs/something/branch", + "269c498e56feb93e408ed4558c8138d750de8893\t\t/Users/ben/test/foo\n", + ) + + fi = FetchInfo._from_line( + self.rorepo, + remote_info_line_fmt % "local/master", + fetch_info_line_fmt % "remote-tracking branch", + ) assert not fi.ref.is_valid() self.assertEqual(fi.ref.name, "local/master") # handles non-default refspecs: One can specify a different path in refs/remotes # or a special path just in refs/something for instance - fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "subdir/tagname", - fetch_info_line_fmt % 'tag') + fi = FetchInfo._from_line( + self.rorepo, + remote_info_line_fmt % "subdir/tagname", + fetch_info_line_fmt % "tag", + ) self.assertIsInstance(fi.ref, TagReference) - assert fi.ref.path.startswith('refs/tags'), fi.ref.path + assert fi.ref.path.startswith("refs/tags"), fi.ref.path # it could be in a remote direcftory though - fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/tags/tagname", - fetch_info_line_fmt % 'tag') + fi = FetchInfo._from_line( + self.rorepo, + remote_info_line_fmt % "remotename/tags/tagname", + fetch_info_line_fmt % "tag", + ) self.assertIsInstance(fi.ref, TagReference) - assert fi.ref.path.startswith('refs/remotes/'), fi.ref.path + assert fi.ref.path.startswith("refs/remotes/"), fi.ref.path # it can also be anywhere ! tag_path = "refs/something/remotename/tags/tagname" - fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % tag_path, - fetch_info_line_fmt % 'tag') + fi = FetchInfo._from_line( + self.rorepo, remote_info_line_fmt % tag_path, fetch_info_line_fmt % "tag" + ) self.assertIsInstance(fi.ref, TagReference) self.assertEqual(fi.ref.path, tag_path) # branches default to refs/remotes - fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/branch", - fetch_info_line_fmt % 'branch') + fi = FetchInfo._from_line( + self.rorepo, + remote_info_line_fmt % "remotename/branch", + fetch_info_line_fmt % "branch", + ) self.assertIsInstance(fi.ref, RemoteReference) - self.assertEqual(fi.ref.remote_name, 'remotename') + self.assertEqual(fi.ref.remote_name, "remotename") # but you can force it anywhere, in which case we only have a references - fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "refs/something/branch", - fetch_info_line_fmt % 'branch') + fi = FetchInfo._from_line( + self.rorepo, + remote_info_line_fmt % "refs/something/branch", + fetch_info_line_fmt % "branch", + ) assert type(fi.ref) is Reference, type(fi.ref) self.assertEqual(fi.ref.path, "refs/something/branch") def test_uncommon_branch_names(self): - stderr_lines = fixture('uncommon_branch_prefix_stderr').decode('ascii').splitlines() - fetch_lines = fixture('uncommon_branch_prefix_FETCH_HEAD').decode('ascii').splitlines() + stderr_lines = ( + fixture("uncommon_branch_prefix_stderr").decode("ascii").splitlines() + ) + fetch_lines = ( + fixture("uncommon_branch_prefix_FETCH_HEAD").decode("ascii").splitlines() + ) # The contents of the files above must be fetched with a custom refspec: # +refs/pull/*:refs/heads/pull/* - res = [FetchInfo._from_line('ShouldntMatterRepo', stderr, fetch_line) - for stderr, fetch_line in zip(stderr_lines, fetch_lines)] + res = [ + FetchInfo._from_line("ShouldntMatterRepo", stderr, fetch_line) + for stderr, fetch_line in zip(stderr_lines, fetch_lines) + ] self.assertGreater(len(res), 0) - self.assertEqual(res[0].remote_ref_path, 'refs/pull/1/head') - self.assertEqual(res[0].ref.path, 'refs/heads/pull/1/head') + self.assertEqual(res[0].remote_ref_path, "refs/pull/1/head") + self.assertEqual(res[0].ref.path, "refs/heads/pull/1/head") self.assertIsInstance(res[0].ref, Head) - @with_rw_repo('HEAD', bare=False) + @with_rw_repo("HEAD", bare=False) def test_multiple_urls(self, rw_repo): # test addresses - test1 = 'https://github.com/gitpython-developers/GitPython' - test2 = 'https://github.com/gitpython-developers/gitdb' - test3 = 'https://github.com/gitpython-developers/smmap' + test1 = "https://github.com/gitpython-developers/GitPython" + test2 = "https://github.com/gitpython-developers/gitdb" + test3 = "https://github.com/gitpython-developers/smmap" remote = rw_repo.remotes[0] # Testing setting a single URL @@ -639,7 +694,7 @@ class TestRemote(TestBase): self.assertRaises(GitCommandError, remote.set_url, test2, add=True, delete=True) # Testing on another remote, with the add/delete URL - remote = rw_repo.create_remote('another', url=test1) + remote = rw_repo.create_remote("another", url=test1) remote.add_url(test2) self.assertEqual(list(remote.urls), [test1, test2]) remote.add_url(test3) @@ -653,19 +708,23 @@ class TestRemote(TestBase): self.assertRaises(GitCommandError, remote.delete_url, test3) def test_fetch_error(self): - rem = self.rorepo.remote('origin') - with self.assertRaisesRegex(GitCommandError, "[Cc]ouldn't find remote ref __BAD_REF__"): - rem.fetch('__BAD_REF__') + rem = self.rorepo.remote("origin") + with self.assertRaisesRegex( + GitCommandError, "[Cc]ouldn't find remote ref __BAD_REF__" + ): + rem.fetch("__BAD_REF__") - @with_rw_repo('0.1.6', bare=False) + @with_rw_repo("0.1.6", bare=False) def test_push_error(self, repo): - rem = repo.remote('origin') - with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"): - rem.push('__BAD_REF__') + rem = repo.remote("origin") + with self.assertRaisesRegex( + GitCommandError, "src refspec __BAD_REF__ does not match any" + ): + rem.push("__BAD_REF__") class TestTimeouts(TestBase): - @with_rw_repo('HEAD', bare=False) + @with_rw_repo("HEAD", bare=False) def test_timeout_funcs(self, repo): # Force error code to prevent a race condition if the python thread is # slow @@ -675,8 +734,7 @@ class TestTimeouts(TestBase): f = getattr(repo.remotes.origin, function) assert f is not None # Make sure these functions exist _ = f() # Make sure the function runs - with pytest.raises(GitCommandError, - match="kill_after_timeout=0 s"): + with pytest.raises(GitCommandError, match="kill_after_timeout=0 s"): f(kill_after_timeout=0) Git.AutoInterrupt._status_code_if_terminate = default |