diff options
-rwxr-xr-x | cmd2.py | 95 | ||||
-rw-r--r-- | tests/test_cmd2.py | 49 | ||||
-rw-r--r-- | tox.ini | 1 |
3 files changed, 89 insertions, 56 deletions
@@ -30,13 +30,13 @@ import codecs import collections import datetime import glob +import io import optparse import os import platform import re import shlex import six -import subprocess import sys import tempfile import traceback @@ -59,6 +59,13 @@ import six.moves as sm # itertools.zip() for Python 2 or zip() for Python 3 - produces an iterator in both cases from six.moves import zip +# If using Python 2.7, try to use the subprocess32 package backported from Python 3.2 due to various improvements +# NOTE: The feature to pipe output to a shell command won't work correctly in Python 2.7 without this +try: + import subprocess32 as subprocess +except ImportError: + import subprocess + # Detect whether IPython is installed to determine if the built-in "ipy" command should be included ipython_available = True try: @@ -513,9 +520,6 @@ class Cmd(cmd.Cmd): self.kept_state = None self.kept_sys = None - # Used for a temp file during a pipe (needed tempfile instead of real pipe for Python 3.x prior to 3.5) - self._temp_filename = None - # Codes used for exit conditions self._STOP_AND_EXIT = True # cmd convention @@ -531,6 +535,9 @@ class Cmd(cmd.Cmd): # Used load command to store the current script dir as a LIFO queue to support _relative_load command self._script_dir = [] + # Used when piping command output to a shell command + self.pipe_proc = None + # ----- Methods related to presenting output to the user ----- @property @@ -765,18 +772,29 @@ class Cmd(cmd.Cmd): """ if statement.parsed.pipeTo: self.kept_state = Statekeeper(self, ('stdout',)) - self.kept_sys = Statekeeper(sys, ('stdout',)) - sys.stdout = self.stdout - # NOTE: We couldn't get a real pipe working via subprocess for Python 3.x prior to 3.5. - # So to allow compatibility with Python 2.7 and 3.3+ we are redirecting output to a temporary file. - # And once command is complete we are the temp file as stdin for the shell command to pipe to. - # TODO: Once support for Python 3.x prior to 3.5 is no longer necessary, replace with a real subprocess pipe + # Create a pipe with read and write sides + read_fd, write_fd = os.pipe() - # Redirect stdout to a temporary file - fd, self._temp_filename = tempfile.mkstemp() - os.close(fd) - self.stdout = open(self._temp_filename, 'w') + # Open each side of the pipe and set stdout accordingly + # noinspection PyTypeChecker + self.stdout = io.open(write_fd, 'w') + # noinspection PyTypeChecker + subproc_stdin = io.open(read_fd, 'r') + + # If you don't set shell=True, subprocess failure will throw an exception + try: + self.pipe_proc = subprocess.Popen(shlex.split(statement.parsed.pipeTo), stdin=subproc_stdin) + except Exception as ex: + # Restore stdout to what it was and close the pipe + self.stdout.close() + subproc_stdin.close() + self.pipe_proc = None + self.kept_state.restore() + self.kept_state = None + + # Re-raise the exception + raise ex elif statement.parsed.output: if (not statement.parsed.outputTo) and (not can_clip): raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable') @@ -797,32 +815,29 @@ class Cmd(cmd.Cmd): :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance """ - if self.kept_state: - try: - if statement.parsed.output: - if not statement.parsed.outputTo: - self.stdout.seek(0) - write_to_paste_buffer(self.stdout.read()) - finally: - self.stdout.close() - self.kept_state.restore() - self.kept_sys.restore() - self.kept_state = None - - if statement.parsed.pipeTo: - # Pipe the contents of tempfile to the specified shell command - with open(self._temp_filename) as fd: - pipe_proc = subprocess.Popen(shlex.split(statement.parsed.pipeTo), stdin=fd, - stdout=subprocess.PIPE) - output, _ = pipe_proc.communicate() - - if six.PY3: - self.stdout.write(output.decode()) - else: - self.stdout.write(output) - - os.remove(self._temp_filename) - self._temp_filename = None + # If we have redirected output to a file or the clipboard or piped it to a shell command, then restore state + if self.kept_state is not None: + # If we redirected output to the clipboard + if statement.parsed.output and not statement.parsed.outputTo: + self.stdout.seek(0) + write_to_paste_buffer(self.stdout.read()) + + # Close the file or pipe that stdout was redirected to + self.stdout.close() + + # If we were piping output to a shell command, then close the subprocess the shell command was running in + if self.pipe_proc is not None: + self.pipe_proc.communicate() + self.pipe_proc = None + + # Restore self.stdout + self.kept_state.restore() + self.kept_state = None + + # Restore sys.stdout if need be + if self.kept_sys is not None: + self.kept_sys.restore() + self.kept_sys = None def _func_named(self, arg): """Gets the method name associated with a given command. diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index f41b7ed1..3a53a9c3 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -543,33 +543,50 @@ def test_input_redirection(base_app, request): # NOTE: File 'redirect.txt" contains 1 word "history" - # Verify that redirecting input from a file works + # Verify that redirecting input ffom a file works out = run_cmd(base_app, 'help < {}'.format(filename)) expected = normalize(HELP_HISTORY) assert out == expected -def test_pipe_to_shell(base_app): +def test_pipe_to_shell(base_app, capsys): if sys.platform == "win32": # Windows + command = 'help | sort' # Get help menu and pipe it's output to the sort shell command - out = run_cmd(base_app, 'help | sort') - expected = ['', '', '_relative_load edit history py quit save shell show', - '========================================', - 'cmdenvironment help load pyscript run set shortcuts', - 'Documented commands (type help <topic>):'] - assert out == expected + run_cmd(base_app, 'help | sort') + # expected = ['', '', '_relative_load edit history py quit save shell show', + # '========================================', + # 'cmdenvironment help load pyscript run set shortcuts', + # 'Documented commands (type help <topic>):'] + # assert out == expected else: # Mac and Linux # Get help on help and pipe it's output to the input of the word count shell command - out = run_cmd(base_app, 'help help | wc') - - # Mac and Linux wc behave the same when piped from shell, but differently when piped stdin from file directly - if sys.platform == 'darwin': - expected = normalize("1 11 70") - else: - expected = normalize("1 11 70") - assert out[0].strip() == expected[0].strip() + command = 'help help | wc' + # # Mac and Linux wc behave the same when piped from shell, but differently when piped stdin from file directly + # if sys.platform == 'darwin': + # expected = "1 11 70" + # else: + # expected = "1 11 70" + # assert out.strip() == expected.strip() + + run_cmd(base_app, command) + out, err = capsys.readouterr() + + # Unfortunately with the improved way of piping output to a subprocess, there isn't any good way of getting + # access to the output produced by that subprocess within a unit test, but we can verify that no error occured + assert not err + + + +def test_pipe_to_shell_error(base_app, capsys): + # Try to pipe command output to a shell command that doesn't exist in order to produce an error + run_cmd(base_app, 'help | foobarbaz.this_does_not_exist') + out, err = capsys.readouterr() + + assert not out + assert err.startswith("EXCEPTION of type 'FileNotFoundError' occurred with message:") @pytest.mark.skipif(not cmd2.can_clip, @@ -19,6 +19,7 @@ deps = pytest-cov pytest-xdist six + subprocess32 commands = py.test {posargs: -n 2} --cov=cmd2 --cov-report=term-missing codecov |