diff options
-rw-r--r-- | cmd2/cmd2.py | 145 | ||||
-rw-r--r-- | cmd2/exceptions.py | 5 | ||||
-rw-r--r-- | cmd2/utils.py | 26 | ||||
-rwxr-xr-x | tests/test_cmd2.py | 34 |
4 files changed, 97 insertions, 113 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index de7e7f5a..e0c28941 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -49,7 +49,7 @@ from . import utils from .argparse_custom import CompletionItem, DEFAULT_ARGUMENT_PARSER from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .decorators import with_argparser -from .exceptions import Cmd2ArgparseError, Cmd2ShlexError, EmbeddedConsoleExit, EmptyStatement +from .exceptions import Cmd2ArgparseError, Cmd2ShlexError, EmbeddedConsoleExit, EmptyStatement, RedirectionError from .history import History, HistoryItem from .parsing import StatementParser, Statement, Macro, MacroArg, shlex_split from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt, rl_warning @@ -1600,21 +1600,19 @@ class Cmd(cmd.Cmd): import datetime stop = False + statement = None + try: + # Convert the line into a Statement statement = self._input_line_to_statement(line) - except (EmptyStatement, Cmd2ShlexError) as ex: - if isinstance(ex, Cmd2ShlexError): - self.perror("Invalid syntax: {}".format(ex)) - return self._run_cmdfinalization_hooks(stop, None) - # now that we have a statement, run it with all the hooks - try: # call the postparsing hooks data = plugin.PostparsingData(False, statement) for func in self._postparsing_hooks: data = func(data) if data.stop: break + # unpack the data object statement = data.statement stop = data.stop @@ -1623,11 +1621,8 @@ class Cmd(cmd.Cmd): # we need to run the finalization hooks raise EmptyStatement - # Keep track of whether or not we were already _redirecting before this command - already_redirecting = self._redirecting - # This will be a utils.RedirectionSavedState object for the command - saved_state = None + redir_saved_state = None try: # Get sigint protection while we set up redirection @@ -1636,50 +1631,40 @@ class Cmd(cmd.Cmd): # Start saving command's stdout at this point self.stdout.pause_storage = False - redir_error, saved_state = self._redirect_output(statement) - self._cur_pipe_proc_reader = saved_state.pipe_proc_reader + redir_saved_state = self._redirect_output(statement) - # Do not continue if an error occurred while trying to redirect - if not redir_error: - # See if we need to update self._redirecting - if not already_redirecting: - self._redirecting = saved_state.redirecting + timestart = datetime.datetime.now() - timestart = datetime.datetime.now() + # precommand hooks + data = plugin.PrecommandData(statement) + for func in self._precmd_hooks: + data = func(data) + statement = data.statement - # precommand hooks - data = plugin.PrecommandData(statement) - for func in self._precmd_hooks: - data = func(data) - statement = data.statement + # call precmd() for compatibility with cmd.Cmd + statement = self.precmd(statement) - # call precmd() for compatibility with cmd.Cmd - statement = self.precmd(statement) + # go run the command function + stop = self.onecmd(statement, add_to_history=add_to_history) - # go run the command function - stop = self.onecmd(statement, add_to_history=add_to_history) + # postcommand hooks + data = plugin.PostcommandData(stop, statement) + for func in self._postcmd_hooks: + data = func(data) - # postcommand hooks - data = plugin.PostcommandData(stop, statement) - for func in self._postcmd_hooks: - data = func(data) + # retrieve the final value of stop, ignoring any statement modification from the hooks + stop = data.stop - # retrieve the final value of stop, ignoring any statement modification from the hooks - stop = data.stop + # call postcmd() for compatibility with cmd.Cmd + stop = self.postcmd(stop, statement) - # call postcmd() for compatibility with cmd.Cmd - stop = self.postcmd(stop, statement) - - if self.timing: - self.pfeedback('Elapsed: {}'.format(datetime.datetime.now() - timestart)) + if self.timing: + self.pfeedback('Elapsed: {}'.format(datetime.datetime.now() - timestart)) finally: # Get sigint protection while we restore stuff with self.sigint_protection: - if saved_state is not None: - self._restore_output(statement, saved_state) - - if not already_redirecting: - self._redirecting = False + if redir_saved_state is not None: + self._restore_output(statement, redir_saved_state) if py_bridge_call: # Stop saving command's stdout before command finalization hooks run @@ -1690,6 +1675,10 @@ class Cmd(cmd.Cmd): except (Cmd2ArgparseError, EmptyStatement): # Don't do anything, but do allow command finalization hooks to run pass + except Cmd2ShlexError as ex: + self.perror("Invalid syntax: {}".format(ex)) + except RedirectionError as ex: + self.perror(ex) except Exception as ex: self.pexcept(ex) finally: @@ -1902,24 +1891,28 @@ class Cmd(cmd.Cmd): # Restore any terminator, suffix, redirection, etc. return resolved + statement.post_command - def _redirect_output(self, statement: Statement) -> Tuple[bool, utils.RedirectionSavedState]: - """Handles output redirection for >, >>, and |. + def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState: + """Set up a command's output redirection for >, >>, and |. :param statement: a parsed statement from the user :return: A bool telling if an error occurred and a utils.RedirectionSavedState object + :raises RedirectionError if an error occurs trying to pipe or redirect """ import io import subprocess - redir_error = False + # Initialize the redirection saved state + redir_saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, + self._cur_pipe_proc_reader, self._redirecting) - # Initialize the saved state - saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self._cur_pipe_proc_reader) + # The ProcReader for this command + cmd_pipe_proc_reader = None if not self.allow_redirection: - return redir_error, saved_state + # Don't return since we set some state variables at the end of the function + pass - if statement.pipe_to: + elif statement.pipe_to: # Create a pipe with read and write sides read_fd, write_fd = os.pipe() @@ -1955,58 +1948,55 @@ class Cmd(cmd.Cmd): # Check if the pipe process already exited if proc.returncode is not None: - self.perror('Pipe process exited with code {} before command could run'.format(proc.returncode)) subproc_stdin.close() new_stdout.close() - redir_error = True + raise RedirectionError('Pipe process exited with code {} before command could run'.format(proc.returncode)) else: - saved_state.redirecting = True - saved_state.pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr) + redir_saved_state.redirecting = True + cmd_pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr) sys.stdout = self.stdout = new_stdout elif statement.output: import tempfile if (not statement.output_to) and (not self._can_clip): - self.perror("Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies") - redir_error = True + raise RedirectionError("Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies") # Redirecting to a file elif statement.output_to: # statement.output can only contain REDIRECTION_APPEND or REDIRECTION_OUTPUT - if statement.output == constants.REDIRECTION_APPEND: - mode = 'a' - else: - mode = 'w' - + mode = 'a' if statement.output == constants.REDIRECTION_APPEND else 'w' try: # Use line buffering new_stdout = open(utils.strip_quotes(statement.output_to), mode=mode, buffering=1) - saved_state.redirecting = True - sys.stdout = self.stdout = new_stdout except OSError as ex: - self.pexcept('Failed to redirect because - {}'.format(ex)) - redir_error = True + raise RedirectionError('Failed to redirect because - {}'.format(ex)) + + redir_saved_state.redirecting = True + sys.stdout = self.stdout = new_stdout # Redirecting to a paste buffer else: new_stdout = tempfile.TemporaryFile(mode="w+") - saved_state.redirecting = True + redir_saved_state.redirecting = True sys.stdout = self.stdout = new_stdout if statement.output == constants.REDIRECTION_APPEND: self.stdout.write(get_paste_buffer()) self.stdout.flush() - return redir_error, saved_state + # These are updated regardless of whether the command redirected + self._cur_pipe_proc_reader = cmd_pipe_proc_reader + self._redirecting = redir_saved_state.redirecting + + return redir_saved_state - def _restore_output(self, statement: Statement, saved_state: utils.RedirectionSavedState) -> None: - """Handles restoring state after output redirection as well as - the actual pipe operation if present. + def _restore_output(self, statement: Statement, saved_redir_state: utils.RedirectionSavedState) -> None: + """Handles restoring state after output redirection :param statement: Statement object which contains the parsed input from the user - :param saved_state: contains information needed to restore state data + :param saved_redir_state: contains information needed to restore state data """ - if saved_state.redirecting: + if saved_redir_state.redirecting: # If we redirected output to the clipboard if statement.output and not statement.output_to: self.stdout.seek(0) @@ -2019,15 +2009,16 @@ class Cmd(cmd.Cmd): pass # Restore the stdout values - self.stdout = saved_state.saved_self_stdout - sys.stdout = saved_state.saved_sys_stdout + self.stdout = saved_redir_state.saved_self_stdout + sys.stdout = saved_redir_state.saved_sys_stdout # Check if we need to wait for the process being piped to if self._cur_pipe_proc_reader is not None: self._cur_pipe_proc_reader.wait() - # Restore _cur_pipe_proc_reader. This always is done, regardless of whether this command redirected. - self._cur_pipe_proc_reader = saved_state.saved_pipe_proc_reader + # These are restored regardless of whether the command redirected + self._cur_pipe_proc_reader = saved_redir_state.saved_pipe_proc_reader + self._redirecting = saved_redir_state.saved_redirecting def cmd_func(self, command: str) -> Optional[Callable]: """ diff --git a/cmd2/exceptions.py b/cmd2/exceptions.py index 15787177..635192e1 100644 --- a/cmd2/exceptions.py +++ b/cmd2/exceptions.py @@ -24,3 +24,8 @@ class EmbeddedConsoleExit(SystemExit): class EmptyStatement(Exception): """Custom exception class for handling behavior when the user just presses <Enter>.""" pass + + +class RedirectionError(Exception): + """Custom exception class for when redirecting or piping output fails""" + pass diff --git a/cmd2/utils.py b/cmd2/utils.py index 58305a61..0749f32b 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -665,20 +665,26 @@ class ContextFlag: class RedirectionSavedState: - """Created by each command to store information about their redirection.""" - + """Created by each command to store information required to restore state after redirection""" def __init__(self, self_stdout: Union[StdSim, TextIO], sys_stdout: Union[StdSim, TextIO], - pipe_proc_reader: Optional[ProcReader]) -> None: - # Used to restore values after the command ends + pipe_proc_reader: Optional[ProcReader], saved_redirecting: bool) -> None: + """ + RedirectionSavedState initializer + :param self_stdout: saved value of Cmd.stdout + :param sys_stdout: saved value of sys.stdout + :param pipe_proc_reader: saved value of Cmd._cur_pipe_proc_reader + :param saved_redirecting: saved value of Cmd._redirecting + """ + # Tells if command is redirecting + self.redirecting = False + + # Used to restore values after redirection ends self.saved_self_stdout = self_stdout self.saved_sys_stdout = sys_stdout - self.saved_pipe_proc_reader = pipe_proc_reader - - # Tells if the command is redirecting - self.redirecting = False - # If the command created a process to pipe to, then then is its reader - self.pipe_proc_reader = None + # Used to restore values after command ends regardless of whether the command redirected + self.saved_pipe_proc_reader = pipe_proc_reader + self.saved_redirecting = saved_redirecting # noinspection PyUnusedLocal diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index bb99e15b..fe3f25a6 100755 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -480,20 +480,11 @@ def test_output_redirection(base_app): def test_output_redirection_to_nonexistent_directory(base_app): filename = '~/fakedir/this_does_not_exist.txt' - # Verify that writing to a file in a non-existent directory doesn't work - run_cmd(base_app, 'help > {}'.format(filename)) - with pytest.raises(FileNotFoundError): - with open(filename) as f: - content = f.read() - verify_help_text(base_app, content) + out, err = run_cmd(base_app, 'help > {}'.format(filename)) + assert 'Failed to redirect' in err[0] - # Verify that appending to a file also works - run_cmd(base_app, 'help history >> {}'.format(filename)) - with pytest.raises(FileNotFoundError): - with open(filename) as f: - appended_content = f.read() - verify_help_text(base_app, appended_content) - assert len(appended_content) > len(content) + out, err = run_cmd(base_app, 'help >> {}'.format(filename)) + assert 'Failed to redirect' in err[0] def test_output_redirection_to_too_long_filename(base_app): filename = '~/sdkfhksdjfhkjdshfkjsdhfkjsdhfkjdshfkjdshfkjshdfkhdsfkjhewfuihewiufhweiufhiweufhiuewhiuewhfiuwehfia' \ @@ -502,20 +493,11 @@ def test_output_redirection_to_too_long_filename(base_app): 'fheiufhieuwhfewiuhfeiufhiuewfhiuewheiwuhfiuewhfiuewhfeiuwfhewiufhiuewhiuewhfeiuwhfiuwehfuiwehfiuehie' \ 'whfieuwfhieufhiuewhfeiuwfhiuefhueiwhfw' - # Verify that writing to a file in a non-existent directory doesn't work - run_cmd(base_app, 'help > {}'.format(filename)) - with pytest.raises(OSError): - with open(filename) as f: - content = f.read() - verify_help_text(base_app, content) + out, err = run_cmd(base_app, 'help > {}'.format(filename)) + assert 'Failed to redirect' in err[0] - # Verify that appending to a file also works - run_cmd(base_app, 'help history >> {}'.format(filename)) - with pytest.raises(OSError): - with open(filename) as f: - appended_content = f.read() - verify_help_text(base_app, content) - assert len(appended_content) > len(content) + out, err = run_cmd(base_app, 'help >> {}'.format(filename)) + assert 'Failed to redirect' in err[0] def test_feedback_to_output_true(base_app): |