diff options
| author | Antoine Musso <hashar@free.fr> | 2014-11-16 20:15:50 +0100 | 
|---|---|---|
| committer | Antoine Musso <hashar@free.fr> | 2014-11-16 20:46:41 +0100 | 
| commit | f5d11b750ecc982541d1f936488248f0b42d75d3 (patch) | |
| tree | 8be522510315f5adc32c0c55acd45dc1074294da /git/test/test_remote.py | |
| parent | 7aba59a2609ec768d5d495dafd23a4bce8179741 (diff) | |
| download | gitpython-f5d11b750ecc982541d1f936488248f0b42d75d3.tar.gz | |
pep8 linting (whitespaces)
W191 indentation contains tabs
E221 multiple spaces before operator
E222 multiple spaces after operator
E225 missing whitespace around operator
E271 multiple spaces after keyword
W292 no newline at end of file
W293 blank line contains whitespace
W391 blank line at end of file
Diffstat (limited to 'git/test/test_remote.py')
| -rw-r--r-- | git/test/test_remote.py | 164 | 
1 files changed, 81 insertions, 83 deletions
| diff --git a/git/test/test_remote.py b/git/test/test_remote.py index b1248096..0b7ab574 100644 --- a/git/test/test_remote.py +++ b/git/test/test_remote.py @@ -22,7 +22,7 @@ class TestRemoteProgress(RemoteProgress):          self._seen_lines = list()          self._stages_per_op = dict()          self._num_progress_messages = 0 -         +      def _parse_progress_line(self, line):          # we may remove the line later if it is dropped          # Keep it for debugging @@ -30,37 +30,37 @@ class TestRemoteProgress(RemoteProgress):          rval = super(TestRemoteProgress, self)._parse_progress_line(line)          assert len(line) > 1, "line %r too short" % line          return rval -         +      def line_dropped(self, line):          try:              self._seen_lines.remove(line)          except ValueError:              pass -         +      def update(self, op_code, cur_count, max_count=None, message=''):          # check each stage only comes once          op_id = op_code & self.OP_MASK          assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) -         +          self._stages_per_op.setdefault(op_id, 0)          self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) -         +          if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):              assert message          # END check we get message -         +          self._num_progress_messages += 1 -         -         + +      def make_assertion(self):          # we don't always receive messages          if not self._seen_lines:              return -         +          # sometimes objects are not compressed which is okay          assert len(self._seen_ops) in (2,3)          assert self._stages_per_op -         +          # must have seen all stages          for op, stages in self._stages_per_op.items():              assert stages & self.STAGE_MASK == self.STAGE_MASK @@ -68,15 +68,15 @@ class TestRemoteProgress(RemoteProgress):      def assert_received_message(self):          assert self._num_progress_messages -     +  class TestRemote(TestBase): -     +      def _print_fetchhead(self, repo):          fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))          fp.close() -         -         + +      def _do_test_fetch_result(self, results, remote):          # self._print_fetchhead(remote.repo)          assert len(results) > 0 and isinstance(results[0], FetchInfo) @@ -92,7 +92,7 @@ class TestRemote(TestBase):                  assert info.old_commit is None              # END forced update checking            # END for each info -         +      def _do_test_push_result(self, results, remote):          assert len(results) > 0 and isinstance(results[0], PushInfo)          for info in results: @@ -115,12 +115,12 @@ class TestRemote(TestBase):                  assert type(info.remote_ref) in (TagReference, RemoteReference)              # END error checking          # END for each info  -         -         + +      def _do_test_fetch_info(self, repo):          self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '')          self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date]      0.1.7RC    -> origin/0.1.7RC", '') -         +      def _commit_random_file(self, repo):          #Create a file with a random name and random data and commit it to  repo.          # Return the commited absolute file path @@ -129,11 +129,11 @@ class TestRemote(TestBase):          index.add([new_file])          index.commit("Committing %s" % new_file)          return new_file -         +      def _do_test_fetch(self,remote, rw_repo, remote_repo):          # specialized fetch testing to de-clutter the main test          self._do_test_fetch_info(rw_repo) -         +          def fetch_and_test(remote, **kwargs):              progress = TestRemoteProgress()              kwargs['progress'] = progress @@ -142,18 +142,18 @@ class TestRemote(TestBase):              self._do_test_fetch_result(res, remote)              return res          # END fetch and check -         +          def get_info(res, remote, name):              return res["%s/%s"%(remote,name)] -         +          # put remote head to master as it is garantueed to exist          remote_repo.head.reference = remote_repo.heads.master -         +          res = fetch_and_test(remote)          # all uptodate          for info in res:              assert info.flags & info.HEAD_UPTODATE -         +          # rewind remote head to trigger rejection          # index must be false as remote is a bare repo          rhead = remote_repo.head @@ -163,39 +163,39 @@ class TestRemote(TestBase):          mkey = "%s/%s"%(remote,'master')          master_info = res[mkey]          assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None -         +          # normal fast forward - set head back to previous one          rhead.commit = remote_commit          res = fetch_and_test(remote)          assert res[mkey].flags & FetchInfo.FAST_FORWARD -         +          # new remote branch          new_remote_branch = Head.create(remote_repo, "new_branch")          res = fetch_and_test(remote)          new_branch_info = get_info(res, remote, new_remote_branch)          assert new_branch_info.flags & FetchInfo.NEW_HEAD -         +          # remote branch rename ( causes creation of a new one locally )          new_remote_branch.rename("other_branch_name")          res = fetch_and_test(remote)          other_branch_info = get_info(res, remote, new_remote_branch)          assert other_branch_info.ref.commit == new_branch_info.ref.commit -         +          # remove new branch          Head.delete(new_remote_branch.repo, new_remote_branch)          res = fetch_and_test(remote)          # deleted remote will not be fetched          self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) -         +          # prune stale tracking branches          stale_refs = remote.stale_refs          assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)          RemoteReference.delete(rw_repo, *stale_refs) -         +          # test single branch fetch with refspec including target remote          res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)          assert len(res) == 1 and get_info(res, remote, 'master') -         +          # ... with respec and no target          res = fetch_and_test(remote, refspec='master')          assert len(res) == 1 @@ -203,26 +203,26 @@ class TestRemote(TestBase):          # ... multiple refspecs          res = fetch_and_test(remote, refspec=['master', 'fred'])          assert len(res) == 1 -         +          # add new tag reference          rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")          res = fetch_and_test(remote, tags=True)          tinfo = res[str(rtag)]          assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit          assert tinfo.flags & tinfo.NEW_TAG -         +          # adjust tag commit          Reference.set_object(rtag, rhead.commit.parents[0].parents[0])          res = fetch_and_test(remote, tags=True)          tinfo = res[str(rtag)]          assert tinfo.commit == rtag.commit          assert tinfo.flags & tinfo.TAG_UPDATE -         +          # delete remote tag - local one will stay          TagReference.delete(remote_repo, rtag)          res = fetch_and_test(remote, tags=True)          self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) -         +          # provoke to receive actual objects to see what kind of output we have to           # expect. For that we need a remote transport protocol          # Create a new UN-shared repo and fetch into it after we pushed a change @@ -232,20 +232,20 @@ class TestRemote(TestBase):          # as it wants local paths only ( which I can understand )          other_repo = remote_repo.clone(other_repo_dir, shared=False)          remote_repo_url = "git://localhost%s"%remote_repo.git_dir -         +          # put origin to git-url          other_origin = other_repo.remotes.origin           other_origin.config_writer.set("url", remote_repo_url)          # it automatically creates alternates as remote_repo is shared as well.          # It will use the transport though and ignore alternates when fetching          # assert not other_repo.alternates  # this would fail -         +          # assure we are in the right state          rw_repo.head.reset(remote.refs.master, working_tree=True)          try:              self._commit_random_file(rw_repo)              remote.push(rw_repo.head.reference) -             +              # here I would expect to see remote-information about packing               # objects and so on. Unfortunately, this does not happen               # if we are redirecting the output - git explicitly checks for this @@ -254,7 +254,7 @@ class TestRemote(TestBase):          finally:              shutil.rmtree(other_repo_dir)          # END test and cleanup -         +      def _assert_push_and_pull(self,remote, rw_repo, remote_repo):          # push our changes          lhead = rw_repo.head @@ -268,11 +268,11 @@ class TestRemote(TestBase):              lhead.reference = rw_repo.create_head('master')          # END master handling           lhead.reset(remote.refs.master, working_tree=True) -         +          # push without spec should fail ( without further configuration )          # well, works nicely          # self.failUnlessRaises(GitCommandError, remote.push) -         +          # simple file push          self._commit_random_file(rw_repo)          progress = TestRemoteProgress() @@ -280,24 +280,24 @@ class TestRemote(TestBase):          assert isinstance(res, IterableList)          self._do_test_push_result(res, remote)          progress.make_assertion() -         +          # rejected - undo last commit          lhead.reset("HEAD~1")          res = remote.push(lhead.reference)          assert res[0].flags & PushInfo.ERROR           assert res[0].flags & PushInfo.REJECTED          self._do_test_push_result(res, remote) -         +          # force rejected pull          res = remote.push('+%s' % lhead.reference)          assert res[0].flags & PushInfo.ERROR == 0           assert res[0].flags & PushInfo.FORCED_UPDATE          self._do_test_push_result(res, remote) -         +          # invalid refspec          res = remote.push("hellothere")          assert len(res) == 0 -         +          # push new tags           progress = TestRemoteProgress()          to_be_updated = "my_tag.1.0RV" @@ -307,18 +307,18 @@ class TestRemote(TestBase):          assert res[-1].flags & PushInfo.NEW_TAG          progress.make_assertion()          self._do_test_push_result(res, remote) -         +          # update push new tags          # Rejection is default          new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)          res = remote.push(tags=True)          self._do_test_push_result(res, remote)          assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR -         +          # push force this tag          res = remote.push("+%s" % new_tag.path)          assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE -         +          # delete tag - have to do it using refspec          res = remote.push(":%s" % new_tag.path)          self._do_test_push_result(res, remote) @@ -326,7 +326,7 @@ class TestRemote(TestBase):          # Currently progress is not properly transferred, especially not using           # the git daemon          # progress.assert_received_message() -         +          # push new branch          new_head = Head.create(rw_repo, "my_new_branch")          progress = TestRemoteProgress() @@ -334,37 +334,37 @@ class TestRemote(TestBase):          assert res[0].flags & PushInfo.NEW_HEAD          progress.make_assertion()          self._do_test_push_result(res, remote) -         +          # delete new branch on the remote end and locally          res = remote.push(":%s" % new_head.path)          self._do_test_push_result(res, remote)          Head.delete(rw_repo, new_head)          assert res[-1].flags & PushInfo.DELETED -         +          # --all          res = remote.push(all=True)          self._do_test_push_result(res, remote) -         +          remote.pull('master') -         +          # cleanup - delete created tags and branches as we are in an innerloop on           # the same repository          TagReference.delete(rw_repo, new_tag, other_tag)          remote.push(":%s" % other_tag.path) -     +      @with_rw_and_rw_remote_repo('0.1.6')      def test_base(self, rw_repo, remote_repo):          num_remotes = 0          remote_set = set()          ran_fetch_test = False -         +          for remote in rw_repo.remotes:              num_remotes += 1              assert remote == remote              assert str(remote) != repr(remote)              remote_set.add(remote)              remote_set.add(remote)  # should already exist -             +              # REFS               refs = remote.refs              assert refs @@ -372,7 +372,7 @@ class TestRemote(TestBase):                  assert ref.remote_name == remote.name                  assert ref.remote_head              # END for each ref -             +              # OPTIONS              # cannot use 'fetch' key anymore as it is now a method              for opt in ("url", ): @@ -380,10 +380,10 @@ class TestRemote(TestBase):                  reader = remote.config_reader                  assert reader.get(opt) == val                  assert reader.get_value(opt, None) == val -                 +                  # unable to write with a reader                  self.failUnlessRaises(IOError, reader.set, opt, "test") -                 +                  # change value                  writer = remote.config_writer                  new_val = "myval" @@ -394,7 +394,7 @@ class TestRemote(TestBase):                  del(writer)                  assert getattr(remote, opt) == val              # END for each default option key  -             +              # RENAME               other_name = "totally_other_name"              prev_name = remote.name @@ -404,10 +404,10 @@ class TestRemote(TestBase):              for time in range(2):                  assert remote.rename(prev_name).name == prev_name              # END for each rename ( back to prev_name ) -             +              # PUSH/PULL TESTING              self._assert_push_and_pull(remote, rw_repo, remote_repo) -             +              # FETCH TESTING              # Only for remotes - local cases are the same or less complicated               # as additional progress information will never be emitted @@ -415,17 +415,17 @@ class TestRemote(TestBase):                  self._do_test_fetch(remote, rw_repo, remote_repo)                  ran_fetch_test = True              # END fetch test   -             +              remote.update()          # END for each remote -         +          assert ran_fetch_test          assert num_remotes          assert num_remotes == len(remote_set) -         +          origin = rw_repo.remote('origin')          assert origin == rw_repo.remotes.origin -         +      @with_rw_repo('HEAD', bare=True)      def test_creation_and_removal(self, bare_rw_repo):          new_name = "test_new_one" @@ -433,18 +433,18 @@ class TestRemote(TestBase):          remote = Remote.create(bare_rw_repo, *arg_list )          assert remote.name == "test_new_one"          assert remote in bare_rw_repo.remotes -         +          # create same one again          self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) -         +          Remote.remove(bare_rw_repo, new_name) -         +          for remote in bare_rw_repo.remotes:              if remote.name == new_name:                  raise AssertionError("Remote removal failed")              # END if deleted remote matches existing remote's name          # END for each remote -         +      def test_fetch_info(self):          # assure we can handle remote-tracking branches          fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge   %s '0.3' of git://github.com/gitpython-developers/GitPython" @@ -454,48 +454,46 @@ class TestRemote(TestBase):                              fetch_info_line_fmt % 'remote-tracking branch')          assert fi.ref.is_valid()          assert fi.ref.commit -         +          # handles non-default refspecs: One can specify a different path in refs/remotes          # or a special path just in refs/something for instance -         +          fi = FetchInfo._from_line(self.rorepo,                              remote_info_line_fmt % "subdir/tagname",                               fetch_info_line_fmt % 'tag') -         +          assert isinstance(fi.ref, TagReference)          assert fi.ref.path.startswith('refs/tags') -         +          # it could be in a remote direcftory though          fi = FetchInfo._from_line(self.rorepo,                              remote_info_line_fmt % "remotename/tags/tagname",                               fetch_info_line_fmt % 'tag') -         +          assert isinstance(fi.ref, TagReference)          assert fi.ref.path.startswith('refs/remotes/') -         +          # it can also be anywhere !          tag_path = "refs/something/remotename/tags/tagname"          fi = FetchInfo._from_line(self.rorepo,                              remote_info_line_fmt % tag_path,                               fetch_info_line_fmt % 'tag') -         +          assert isinstance(fi.ref, TagReference)          assert fi.ref.path == tag_path -         +          # branches default to refs/remotes          fi = FetchInfo._from_line(self.rorepo,                              remote_info_line_fmt % "remotename/branch",                               fetch_info_line_fmt % 'branch') -         +          assert isinstance(fi.ref, RemoteReference)          assert fi.ref.remote_name == 'remotename' -         +          # but you can force it anywhere, in which case we only have a references          fi = FetchInfo._from_line(self.rorepo,                              remote_info_line_fmt % "refs/something/branch",                               fetch_info_line_fmt % 'branch') -         +          assert type(fi.ref) is Reference          assert fi.ref.path == "refs/something/branch" -         -             | 
