summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd2/cmd2.py104
-rw-r--r--cmd2/utils.py6
-rw-r--r--tests/test_cmd2.py27
3 files changed, 67 insertions, 70 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index c2affe79..67cd62f3 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -1723,43 +1723,45 @@ class Cmd(cmd.Cmd):
already_redirecting = self.redirecting
# Handle any redirection for this command
- saved_state = self._redirect_output(statement)
+ redir_error, saved_state = self._redirect_output(statement)
- # See if we need to update self.redirecting
- if not already_redirecting:
- self.redirecting = saved_state.redirecting
-
- try:
- timestart = datetime.datetime.now()
- if self._in_py:
- self._last_result = None
-
- # precommand hooks
- data = plugin.PrecommandData(statement)
- for func in self._precmd_hooks:
- data = func(data)
- statement = data.statement
- # call precmd() for compatibility with cmd.Cmd
- statement = self.precmd(statement)
-
- # go run the command function
- stop = self.onecmd(statement)
-
- # postcommand hooks
- data = plugin.PostcommandData(stop, statement)
- for func in self._postcmd_hooks:
- data = func(data)
- # retrieve the final value of stop, ignoring any statement modification from the hooks
- stop = data.stop
- # call postcmd() for compatibility with cmd.Cmd
- stop = self.postcmd(stop, statement)
-
- if self.timing:
- self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
- finally:
- self._restore_output(statement, saved_state)
+ # Do not continue if an error occurred while trying to redirect
+ if not redir_error:
+ # See if we need to update self.redirecting
if not already_redirecting:
- self.redirecting = False
+ self.redirecting = saved_state.redirecting
+
+ try:
+ timestart = datetime.datetime.now()
+ if self._in_py:
+ self._last_result = None
+
+ # precommand hooks
+ data = plugin.PrecommandData(statement)
+ for func in self._precmd_hooks:
+ data = func(data)
+ statement = data.statement
+ # call precmd() for compatibility with cmd.Cmd
+ statement = self.precmd(statement)
+
+ # go run the command function
+ stop = self.onecmd(statement)
+
+ # postcommand hooks
+ data = plugin.PostcommandData(stop, statement)
+ for func in self._postcmd_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any statement modification from the hooks
+ stop = data.stop
+ # call postcmd() for compatibility with cmd.Cmd
+ stop = self.postcmd(stop, statement)
+
+ if self.timing:
+ self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
+ finally:
+ self._restore_output(statement, saved_state)
+ if not already_redirecting:
+ self.redirecting = False
except EmptyStatement:
# don't do anything, but do allow command finalization hooks to run
@@ -1885,19 +1887,20 @@ class Cmd(cmd.Cmd):
raise EmptyStatement()
return statement
- def _redirect_output(self, statement: Statement) -> RedirectionSavedState:
+ def _redirect_output(self, statement: Statement) -> Tuple[bool, RedirectionSavedState]:
"""Handles output redirection for >, >>, and |.
:param statement: a parsed statement from the user
- :return: A RedirectionSavedState object
+ :return: A bool telling if an error occurred and a RedirectionSavedState object
"""
import io
import subprocess
- ret_val = RedirectionSavedState(redirecting=False, piping=False)
+ redir_error = False
+ saved_state = RedirectionSavedState(redirecting=False, piping=False)
if not self.allow_redirection:
- return ret_val
+ return redir_error, saved_state
if statement.pipe_to:
# Create a pipe with read and write sides
@@ -1928,22 +1931,25 @@ class Cmd(cmd.Cmd):
creationflags=creationflags,
start_new_session=start_new_session)
- ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout,
- piping=True, pipe_proc_reader=self.pipe_proc_reader)
+ saved_state = RedirectionSavedState(redirecting=True, self_stdout=self.stdout,
+ piping=True, pipe_proc_reader=self.pipe_proc_reader)
self.pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr)
self.stdout = pipe_write
except Exception as ex:
- self.perror('Not piping because - {}'.format(ex), traceback_war=False)
+ self.perror('Failed to open pipe because - {}'.format(ex), traceback_war=False)
pipe_read.close()
pipe_write.close()
+ redir_error = True
elif statement.output:
import tempfile
if (not statement.output_to) and (not self.can_clip):
- raise EnvironmentError("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable")
+ self.perror("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable",
+ traceback_war=False)
+ redir_error = True
- if statement.output_to:
+ elif statement.output_to:
# going to a file
mode = 'w'
# statement.output can only contain
@@ -1952,19 +1958,21 @@ class Cmd(cmd.Cmd):
mode = 'a'
try:
new_stdout = open(statement.output_to, mode)
- ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout)
+ saved_state = RedirectionSavedState(redirecting=True,
+ self_stdout=self.stdout, sys_stdout=sys.stdout)
sys.stdout = self.stdout = new_stdout
except OSError as ex:
- self.perror('Not redirecting because - {}'.format(ex), traceback_war=False)
+ self.perror('Failed to redirect because - {}'.format(ex), traceback_war=False)
+ redir_error = True
else:
# going to a paste buffer
new_stdout = tempfile.TemporaryFile(mode="w+")
- ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout)
+ saved_state = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout)
sys.stdout = self.stdout = new_stdout
if statement.output == constants.REDIRECTION_APPEND:
self.poutput(get_paste_buffer())
- return ret_val
+ return redir_error, saved_state
def _restore_output(self, statement: Statement, saved_state: RedirectionSavedState) -> None:
"""Handles restoring state after output redirection as well as
diff --git a/cmd2/utils.py b/cmd2/utils.py
index 29ae332a..697cf6f8 100644
--- a/cmd2/utils.py
+++ b/cmd2/utils.py
@@ -455,10 +455,10 @@ class ProcReader(object):
:param to_write: the bytes being written
"""
try:
- if 'b' in stream.mode:
- stream.write(to_write)
- else:
+ if hasattr(stream, 'buffer'):
stream.buffer.write(to_write)
+ else:
+ stream.write(to_write)
except BrokenPipeError:
# This occurs if output is being piped to a process that closed
pass
diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py
index bb0eecdb..e94f1c06 100644
--- a/tests/test_cmd2.py
+++ b/tests/test_cmd2.py
@@ -572,8 +572,7 @@ def test_disallow_redirection(base_app):
# Verify that no file got created
assert not os.path.exists(filename)
-@pytest.mark.skipif(True, reason="Waiting on StdSim updates")
-def test_pipe_to_shell(base_app, capsys):
+def test_pipe_to_shell(base_app):
if sys.platform == "win32":
# Windows
command = 'help | sort'
@@ -581,27 +580,17 @@ def test_pipe_to_shell(base_app, capsys):
# Mac and Linux
# Get help on help and pipe it's output to the input of the word count shell command
command = 'help help | wc'
- # # Mac and Linux wc behave the same when piped from shell, but differently when piped stdin from file directly
- # if sys.platform == 'darwin':
- # expected = "1 11 70"
- # else:
- # expected = "1 11 70"
- # assert out.strip() == expected.strip()
- run_cmd(base_app, command)
- out, err = capsys.readouterr()
-
- # Unfortunately with the improved way of piping output to a subprocess, there isn't any good way of getting
- # access to the output produced by that subprocess within a unit test, but we can verify that no error occurred
- assert not err
+ sys.stderr = utils.StdSim(sys.stderr)
+ out = run_cmd(base_app, command)
+ assert out and not sys.stderr.getvalue()
-def test_pipe_to_shell_error(base_app, capsys):
+def test_pipe_to_shell_error(base_app):
# Try to pipe command output to a shell command that doesn't exist in order to produce an error
- run_cmd(base_app, 'help | foobarbaz.this_does_not_exist')
- out, err = capsys.readouterr()
+ sys.stderr = utils.StdSim(sys.stderr)
+ out = run_cmd(base_app, 'help | foobarbaz.this_does_not_exist')
assert not out
- assert err.startswith("Not piping because")
-
+ assert "No such file or directory" in sys.stderr.getvalue()
@pytest.mark.skipif(not clipboard.can_clip,
reason="Pyperclip could not find a copy/paste mechanism for your system")