diff options
-rw-r--r-- | cmd2/cmd2.py | 104 | ||||
-rw-r--r-- | cmd2/utils.py | 6 | ||||
-rw-r--r-- | tests/test_cmd2.py | 27 |
3 files changed, 67 insertions, 70 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index c2affe79..67cd62f3 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -1723,43 +1723,45 @@ class Cmd(cmd.Cmd): already_redirecting = self.redirecting # Handle any redirection for this command - saved_state = self._redirect_output(statement) + redir_error, saved_state = self._redirect_output(statement) - # See if we need to update self.redirecting - if not already_redirecting: - self.redirecting = saved_state.redirecting - - try: - timestart = datetime.datetime.now() - if self._in_py: - self._last_result = None - - # precommand hooks - data = plugin.PrecommandData(statement) - for func in self._precmd_hooks: - data = func(data) - statement = data.statement - # call precmd() for compatibility with cmd.Cmd - statement = self.precmd(statement) - - # go run the command function - stop = self.onecmd(statement) - - # postcommand hooks - data = plugin.PostcommandData(stop, statement) - for func in self._postcmd_hooks: - data = func(data) - # retrieve the final value of stop, ignoring any statement modification from the hooks - stop = data.stop - # call postcmd() for compatibility with cmd.Cmd - stop = self.postcmd(stop, statement) - - if self.timing: - self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) - finally: - self._restore_output(statement, saved_state) + # Do not continue if an error occurred while trying to redirect + if not redir_error: + # See if we need to update self.redirecting if not already_redirecting: - self.redirecting = False + self.redirecting = saved_state.redirecting + + try: + timestart = datetime.datetime.now() + if self._in_py: + self._last_result = None + + # precommand hooks + data = plugin.PrecommandData(statement) + for func in self._precmd_hooks: + data = func(data) + statement = data.statement + # call precmd() for compatibility with cmd.Cmd + statement = self.precmd(statement) + + # go run the command function + stop = self.onecmd(statement) + + # postcommand hooks + data = plugin.PostcommandData(stop, statement) + for func in self._postcmd_hooks: + data = func(data) + # retrieve the final value of stop, ignoring any statement modification from the hooks + stop = data.stop + # call postcmd() for compatibility with cmd.Cmd + stop = self.postcmd(stop, statement) + + if self.timing: + self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) + finally: + self._restore_output(statement, saved_state) + if not already_redirecting: + self.redirecting = False except EmptyStatement: # don't do anything, but do allow command finalization hooks to run @@ -1885,19 +1887,20 @@ class Cmd(cmd.Cmd): raise EmptyStatement() return statement - def _redirect_output(self, statement: Statement) -> RedirectionSavedState: + def _redirect_output(self, statement: Statement) -> Tuple[bool, RedirectionSavedState]: """Handles output redirection for >, >>, and |. :param statement: a parsed statement from the user - :return: A RedirectionSavedState object + :return: A bool telling if an error occurred and a RedirectionSavedState object """ import io import subprocess - ret_val = RedirectionSavedState(redirecting=False, piping=False) + redir_error = False + saved_state = RedirectionSavedState(redirecting=False, piping=False) if not self.allow_redirection: - return ret_val + return redir_error, saved_state if statement.pipe_to: # Create a pipe with read and write sides @@ -1928,22 +1931,25 @@ class Cmd(cmd.Cmd): creationflags=creationflags, start_new_session=start_new_session) - ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, - piping=True, pipe_proc_reader=self.pipe_proc_reader) + saved_state = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, + piping=True, pipe_proc_reader=self.pipe_proc_reader) self.pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr) self.stdout = pipe_write except Exception as ex: - self.perror('Not piping because - {}'.format(ex), traceback_war=False) + self.perror('Failed to open pipe because - {}'.format(ex), traceback_war=False) pipe_read.close() pipe_write.close() + redir_error = True elif statement.output: import tempfile if (not statement.output_to) and (not self.can_clip): - raise EnvironmentError("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable") + self.perror("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable", + traceback_war=False) + redir_error = True - if statement.output_to: + elif statement.output_to: # going to a file mode = 'w' # statement.output can only contain @@ -1952,19 +1958,21 @@ class Cmd(cmd.Cmd): mode = 'a' try: new_stdout = open(statement.output_to, mode) - ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout) + saved_state = RedirectionSavedState(redirecting=True, + self_stdout=self.stdout, sys_stdout=sys.stdout) sys.stdout = self.stdout = new_stdout except OSError as ex: - self.perror('Not redirecting because - {}'.format(ex), traceback_war=False) + self.perror('Failed to redirect because - {}'.format(ex), traceback_war=False) + redir_error = True else: # going to a paste buffer new_stdout = tempfile.TemporaryFile(mode="w+") - ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout) + saved_state = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout) sys.stdout = self.stdout = new_stdout if statement.output == constants.REDIRECTION_APPEND: self.poutput(get_paste_buffer()) - return ret_val + return redir_error, saved_state def _restore_output(self, statement: Statement, saved_state: RedirectionSavedState) -> None: """Handles restoring state after output redirection as well as diff --git a/cmd2/utils.py b/cmd2/utils.py index 29ae332a..697cf6f8 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -455,10 +455,10 @@ class ProcReader(object): :param to_write: the bytes being written """ try: - if 'b' in stream.mode: - stream.write(to_write) - else: + if hasattr(stream, 'buffer'): stream.buffer.write(to_write) + else: + stream.write(to_write) except BrokenPipeError: # This occurs if output is being piped to a process that closed pass diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index bb0eecdb..e94f1c06 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -572,8 +572,7 @@ def test_disallow_redirection(base_app): # Verify that no file got created assert not os.path.exists(filename) -@pytest.mark.skipif(True, reason="Waiting on StdSim updates") -def test_pipe_to_shell(base_app, capsys): +def test_pipe_to_shell(base_app): if sys.platform == "win32": # Windows command = 'help | sort' @@ -581,27 +580,17 @@ def test_pipe_to_shell(base_app, capsys): # Mac and Linux # Get help on help and pipe it's output to the input of the word count shell command command = 'help help | wc' - # # Mac and Linux wc behave the same when piped from shell, but differently when piped stdin from file directly - # if sys.platform == 'darwin': - # expected = "1 11 70" - # else: - # expected = "1 11 70" - # assert out.strip() == expected.strip() - run_cmd(base_app, command) - out, err = capsys.readouterr() - - # Unfortunately with the improved way of piping output to a subprocess, there isn't any good way of getting - # access to the output produced by that subprocess within a unit test, but we can verify that no error occurred - assert not err + sys.stderr = utils.StdSim(sys.stderr) + out = run_cmd(base_app, command) + assert out and not sys.stderr.getvalue() -def test_pipe_to_shell_error(base_app, capsys): +def test_pipe_to_shell_error(base_app): # Try to pipe command output to a shell command that doesn't exist in order to produce an error - run_cmd(base_app, 'help | foobarbaz.this_does_not_exist') - out, err = capsys.readouterr() + sys.stderr = utils.StdSim(sys.stderr) + out = run_cmd(base_app, 'help | foobarbaz.this_does_not_exist') assert not out - assert err.startswith("Not piping because") - + assert "No such file or directory" in sys.stderr.getvalue() @pytest.mark.skipif(not clipboard.can_clip, reason="Pyperclip could not find a copy/paste mechanism for your system") |