summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd2/cmd2.py145
-rw-r--r--cmd2/exceptions.py5
-rw-r--r--cmd2/utils.py26
-rwxr-xr-xtests/test_cmd2.py34
4 files changed, 97 insertions, 113 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index de7e7f5a..e0c28941 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -49,7 +49,7 @@ from . import utils
from .argparse_custom import CompletionItem, DEFAULT_ARGUMENT_PARSER
from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
from .decorators import with_argparser
-from .exceptions import Cmd2ArgparseError, Cmd2ShlexError, EmbeddedConsoleExit, EmptyStatement
+from .exceptions import Cmd2ArgparseError, Cmd2ShlexError, EmbeddedConsoleExit, EmptyStatement, RedirectionError
from .history import History, HistoryItem
from .parsing import StatementParser, Statement, Macro, MacroArg, shlex_split
from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt, rl_warning
@@ -1600,21 +1600,19 @@ class Cmd(cmd.Cmd):
import datetime
stop = False
+ statement = None
+
try:
+ # Convert the line into a Statement
statement = self._input_line_to_statement(line)
- except (EmptyStatement, Cmd2ShlexError) as ex:
- if isinstance(ex, Cmd2ShlexError):
- self.perror("Invalid syntax: {}".format(ex))
- return self._run_cmdfinalization_hooks(stop, None)
- # now that we have a statement, run it with all the hooks
- try:
# call the postparsing hooks
data = plugin.PostparsingData(False, statement)
for func in self._postparsing_hooks:
data = func(data)
if data.stop:
break
+
# unpack the data object
statement = data.statement
stop = data.stop
@@ -1623,11 +1621,8 @@ class Cmd(cmd.Cmd):
# we need to run the finalization hooks
raise EmptyStatement
- # Keep track of whether or not we were already _redirecting before this command
- already_redirecting = self._redirecting
-
# This will be a utils.RedirectionSavedState object for the command
- saved_state = None
+ redir_saved_state = None
try:
# Get sigint protection while we set up redirection
@@ -1636,50 +1631,40 @@ class Cmd(cmd.Cmd):
# Start saving command's stdout at this point
self.stdout.pause_storage = False
- redir_error, saved_state = self._redirect_output(statement)
- self._cur_pipe_proc_reader = saved_state.pipe_proc_reader
+ redir_saved_state = self._redirect_output(statement)
- # Do not continue if an error occurred while trying to redirect
- if not redir_error:
- # See if we need to update self._redirecting
- if not already_redirecting:
- self._redirecting = saved_state.redirecting
+ timestart = datetime.datetime.now()
- timestart = datetime.datetime.now()
+ # precommand hooks
+ data = plugin.PrecommandData(statement)
+ for func in self._precmd_hooks:
+ data = func(data)
+ statement = data.statement
- # precommand hooks
- data = plugin.PrecommandData(statement)
- for func in self._precmd_hooks:
- data = func(data)
- statement = data.statement
+ # call precmd() for compatibility with cmd.Cmd
+ statement = self.precmd(statement)
- # call precmd() for compatibility with cmd.Cmd
- statement = self.precmd(statement)
+ # go run the command function
+ stop = self.onecmd(statement, add_to_history=add_to_history)
- # go run the command function
- stop = self.onecmd(statement, add_to_history=add_to_history)
+ # postcommand hooks
+ data = plugin.PostcommandData(stop, statement)
+ for func in self._postcmd_hooks:
+ data = func(data)
- # postcommand hooks
- data = plugin.PostcommandData(stop, statement)
- for func in self._postcmd_hooks:
- data = func(data)
+ # retrieve the final value of stop, ignoring any statement modification from the hooks
+ stop = data.stop
- # retrieve the final value of stop, ignoring any statement modification from the hooks
- stop = data.stop
+ # call postcmd() for compatibility with cmd.Cmd
+ stop = self.postcmd(stop, statement)
- # call postcmd() for compatibility with cmd.Cmd
- stop = self.postcmd(stop, statement)
-
- if self.timing:
- self.pfeedback('Elapsed: {}'.format(datetime.datetime.now() - timestart))
+ if self.timing:
+ self.pfeedback('Elapsed: {}'.format(datetime.datetime.now() - timestart))
finally:
# Get sigint protection while we restore stuff
with self.sigint_protection:
- if saved_state is not None:
- self._restore_output(statement, saved_state)
-
- if not already_redirecting:
- self._redirecting = False
+ if redir_saved_state is not None:
+ self._restore_output(statement, redir_saved_state)
if py_bridge_call:
# Stop saving command's stdout before command finalization hooks run
@@ -1690,6 +1675,10 @@ class Cmd(cmd.Cmd):
except (Cmd2ArgparseError, EmptyStatement):
# Don't do anything, but do allow command finalization hooks to run
pass
+ except Cmd2ShlexError as ex:
+ self.perror("Invalid syntax: {}".format(ex))
+ except RedirectionError as ex:
+ self.perror(ex)
except Exception as ex:
self.pexcept(ex)
finally:
@@ -1902,24 +1891,28 @@ class Cmd(cmd.Cmd):
# Restore any terminator, suffix, redirection, etc.
return resolved + statement.post_command
- def _redirect_output(self, statement: Statement) -> Tuple[bool, utils.RedirectionSavedState]:
- """Handles output redirection for >, >>, and |.
+ def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState:
+ """Set up a command's output redirection for >, >>, and |.
:param statement: a parsed statement from the user
:return: A bool telling if an error occurred and a utils.RedirectionSavedState object
+ :raises RedirectionError if an error occurs trying to pipe or redirect
"""
import io
import subprocess
- redir_error = False
+ # Initialize the redirection saved state
+ redir_saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout,
+ self._cur_pipe_proc_reader, self._redirecting)
- # Initialize the saved state
- saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self._cur_pipe_proc_reader)
+ # The ProcReader for this command
+ cmd_pipe_proc_reader = None
if not self.allow_redirection:
- return redir_error, saved_state
+ # Don't return since we set some state variables at the end of the function
+ pass
- if statement.pipe_to:
+ elif statement.pipe_to:
# Create a pipe with read and write sides
read_fd, write_fd = os.pipe()
@@ -1955,58 +1948,55 @@ class Cmd(cmd.Cmd):
# Check if the pipe process already exited
if proc.returncode is not None:
- self.perror('Pipe process exited with code {} before command could run'.format(proc.returncode))
subproc_stdin.close()
new_stdout.close()
- redir_error = True
+ raise RedirectionError('Pipe process exited with code {} before command could run'.format(proc.returncode))
else:
- saved_state.redirecting = True
- saved_state.pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr)
+ redir_saved_state.redirecting = True
+ cmd_pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr)
sys.stdout = self.stdout = new_stdout
elif statement.output:
import tempfile
if (not statement.output_to) and (not self._can_clip):
- self.perror("Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies")
- redir_error = True
+ raise RedirectionError("Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies")
# Redirecting to a file
elif statement.output_to:
# statement.output can only contain REDIRECTION_APPEND or REDIRECTION_OUTPUT
- if statement.output == constants.REDIRECTION_APPEND:
- mode = 'a'
- else:
- mode = 'w'
-
+ mode = 'a' if statement.output == constants.REDIRECTION_APPEND else 'w'
try:
# Use line buffering
new_stdout = open(utils.strip_quotes(statement.output_to), mode=mode, buffering=1)
- saved_state.redirecting = True
- sys.stdout = self.stdout = new_stdout
except OSError as ex:
- self.pexcept('Failed to redirect because - {}'.format(ex))
- redir_error = True
+ raise RedirectionError('Failed to redirect because - {}'.format(ex))
+
+ redir_saved_state.redirecting = True
+ sys.stdout = self.stdout = new_stdout
# Redirecting to a paste buffer
else:
new_stdout = tempfile.TemporaryFile(mode="w+")
- saved_state.redirecting = True
+ redir_saved_state.redirecting = True
sys.stdout = self.stdout = new_stdout
if statement.output == constants.REDIRECTION_APPEND:
self.stdout.write(get_paste_buffer())
self.stdout.flush()
- return redir_error, saved_state
+ # These are updated regardless of whether the command redirected
+ self._cur_pipe_proc_reader = cmd_pipe_proc_reader
+ self._redirecting = redir_saved_state.redirecting
+
+ return redir_saved_state
- def _restore_output(self, statement: Statement, saved_state: utils.RedirectionSavedState) -> None:
- """Handles restoring state after output redirection as well as
- the actual pipe operation if present.
+ def _restore_output(self, statement: Statement, saved_redir_state: utils.RedirectionSavedState) -> None:
+ """Handles restoring state after output redirection
:param statement: Statement object which contains the parsed input from the user
- :param saved_state: contains information needed to restore state data
+ :param saved_redir_state: contains information needed to restore state data
"""
- if saved_state.redirecting:
+ if saved_redir_state.redirecting:
# If we redirected output to the clipboard
if statement.output and not statement.output_to:
self.stdout.seek(0)
@@ -2019,15 +2009,16 @@ class Cmd(cmd.Cmd):
pass
# Restore the stdout values
- self.stdout = saved_state.saved_self_stdout
- sys.stdout = saved_state.saved_sys_stdout
+ self.stdout = saved_redir_state.saved_self_stdout
+ sys.stdout = saved_redir_state.saved_sys_stdout
# Check if we need to wait for the process being piped to
if self._cur_pipe_proc_reader is not None:
self._cur_pipe_proc_reader.wait()
- # Restore _cur_pipe_proc_reader. This always is done, regardless of whether this command redirected.
- self._cur_pipe_proc_reader = saved_state.saved_pipe_proc_reader
+ # These are restored regardless of whether the command redirected
+ self._cur_pipe_proc_reader = saved_redir_state.saved_pipe_proc_reader
+ self._redirecting = saved_redir_state.saved_redirecting
def cmd_func(self, command: str) -> Optional[Callable]:
"""
diff --git a/cmd2/exceptions.py b/cmd2/exceptions.py
index 15787177..635192e1 100644
--- a/cmd2/exceptions.py
+++ b/cmd2/exceptions.py
@@ -24,3 +24,8 @@ class EmbeddedConsoleExit(SystemExit):
class EmptyStatement(Exception):
"""Custom exception class for handling behavior when the user just presses <Enter>."""
pass
+
+
+class RedirectionError(Exception):
+ """Custom exception class for when redirecting or piping output fails"""
+ pass
diff --git a/cmd2/utils.py b/cmd2/utils.py
index 58305a61..0749f32b 100644
--- a/cmd2/utils.py
+++ b/cmd2/utils.py
@@ -665,20 +665,26 @@ class ContextFlag:
class RedirectionSavedState:
- """Created by each command to store information about their redirection."""
-
+ """Created by each command to store information required to restore state after redirection"""
def __init__(self, self_stdout: Union[StdSim, TextIO], sys_stdout: Union[StdSim, TextIO],
- pipe_proc_reader: Optional[ProcReader]) -> None:
- # Used to restore values after the command ends
+ pipe_proc_reader: Optional[ProcReader], saved_redirecting: bool) -> None:
+ """
+ RedirectionSavedState initializer
+ :param self_stdout: saved value of Cmd.stdout
+ :param sys_stdout: saved value of sys.stdout
+ :param pipe_proc_reader: saved value of Cmd._cur_pipe_proc_reader
+ :param saved_redirecting: saved value of Cmd._redirecting
+ """
+ # Tells if command is redirecting
+ self.redirecting = False
+
+ # Used to restore values after redirection ends
self.saved_self_stdout = self_stdout
self.saved_sys_stdout = sys_stdout
- self.saved_pipe_proc_reader = pipe_proc_reader
-
- # Tells if the command is redirecting
- self.redirecting = False
- # If the command created a process to pipe to, then then is its reader
- self.pipe_proc_reader = None
+ # Used to restore values after command ends regardless of whether the command redirected
+ self.saved_pipe_proc_reader = pipe_proc_reader
+ self.saved_redirecting = saved_redirecting
# noinspection PyUnusedLocal
diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py
index bb99e15b..fe3f25a6 100755
--- a/tests/test_cmd2.py
+++ b/tests/test_cmd2.py
@@ -480,20 +480,11 @@ def test_output_redirection(base_app):
def test_output_redirection_to_nonexistent_directory(base_app):
filename = '~/fakedir/this_does_not_exist.txt'
- # Verify that writing to a file in a non-existent directory doesn't work
- run_cmd(base_app, 'help > {}'.format(filename))
- with pytest.raises(FileNotFoundError):
- with open(filename) as f:
- content = f.read()
- verify_help_text(base_app, content)
+ out, err = run_cmd(base_app, 'help > {}'.format(filename))
+ assert 'Failed to redirect' in err[0]
- # Verify that appending to a file also works
- run_cmd(base_app, 'help history >> {}'.format(filename))
- with pytest.raises(FileNotFoundError):
- with open(filename) as f:
- appended_content = f.read()
- verify_help_text(base_app, appended_content)
- assert len(appended_content) > len(content)
+ out, err = run_cmd(base_app, 'help >> {}'.format(filename))
+ assert 'Failed to redirect' in err[0]
def test_output_redirection_to_too_long_filename(base_app):
filename = '~/sdkfhksdjfhkjdshfkjsdhfkjsdhfkjdshfkjdshfkjshdfkhdsfkjhewfuihewiufhweiufhiweufhiuewhiuewhfiuwehfia' \
@@ -502,20 +493,11 @@ def test_output_redirection_to_too_long_filename(base_app):
'fheiufhieuwhfewiuhfeiufhiuewfhiuewheiwuhfiuewhfiuewhfeiuwfhewiufhiuewhiuewhfeiuwhfiuwehfuiwehfiuehie' \
'whfieuwfhieufhiuewhfeiuwfhiuefhueiwhfw'
- # Verify that writing to a file in a non-existent directory doesn't work
- run_cmd(base_app, 'help > {}'.format(filename))
- with pytest.raises(OSError):
- with open(filename) as f:
- content = f.read()
- verify_help_text(base_app, content)
+ out, err = run_cmd(base_app, 'help > {}'.format(filename))
+ assert 'Failed to redirect' in err[0]
- # Verify that appending to a file also works
- run_cmd(base_app, 'help history >> {}'.format(filename))
- with pytest.raises(OSError):
- with open(filename) as f:
- appended_content = f.read()
- verify_help_text(base_app, content)
- assert len(appended_content) > len(content)
+ out, err = run_cmd(base_app, 'help >> {}'.format(filename))
+ assert 'Failed to redirect' in err[0]
def test_feedback_to_output_true(base_app):