summaryrefslogtreecommitdiff
path: root/cmd2/utils.py
diff options
context:
space:
mode:
authorTodd Leonhardt <todd.leonhardt@gmail.com>2019-04-03 16:30:48 -0400
committerGitHub <noreply@github.com>2019-04-03 16:30:48 -0400
commitf3b9a35e52b3b1d40e1d4f3a749e8faf32fb3d7f (patch)
treedb0827201bff299023bd9563041f13a2a39a2af4 /cmd2/utils.py
parent6fd9cc69e58a9b85ae5579ec20de429ed2cfef5e (diff)
parent626a01a5fad70437b0e3f630b653ed9c8cf9c616 (diff)
downloadcmd2-git-f3b9a35e52b3b1d40e1d4f3a749e8faf32fb3d7f.tar.gz
Merge pull request #655 from python-cmd2/capture_popen
Capture popen
Diffstat (limited to 'cmd2/utils.py')
-rw-r--r--cmd2/utils.py196
1 files changed, 171 insertions, 25 deletions
diff --git a/cmd2/utils.py b/cmd2/utils.py
index f3c29227..d8ac513d 100644
--- a/cmd2/utils.py
+++ b/cmd2/utils.py
@@ -1,13 +1,14 @@
-#
# coding=utf-8
"""Shared utility functions"""
import collections
import os
import re
+import subprocess
import sys
+import threading
import unicodedata
-from typing import Any, Iterable, List, Optional, Union
+from typing import Any, BinaryIO, Iterable, List, Optional, TextIO, Union
from wcwidth import wcswidth
@@ -140,7 +141,6 @@ def which(editor: str) -> Optional[str]:
:param editor: filename of the editor to check, ie 'notepad.exe' or 'vi'
:return: a full path or None
"""
- import subprocess
try:
editor_path = subprocess.check_output(['which', editor], stderr=subprocess.STDOUT).strip()
editor_path = editor_path.decode()
@@ -262,6 +262,32 @@ def natural_sort(list_to_sort: Iterable[str]) -> List[str]:
return sorted(list_to_sort, key=natural_keys)
+def unquote_redirection_tokens(args: List[str]) -> None:
+ """
+ Unquote redirection tokens in a list of command-line arguments
+ This is used when redirection tokens have to be passed to another command
+ :param args: the command line args
+ """
+ for i, arg in enumerate(args):
+ unquoted_arg = strip_quotes(arg)
+ if unquoted_arg in constants.REDIRECTION_TOKENS:
+ args[i] = unquoted_arg
+
+
+def find_editor() -> str:
+ """Find a reasonable editor to use by default for the system that the cmd2 application is running on."""
+ editor = os.environ.get('EDITOR')
+ if not editor:
+ if sys.platform[:3] == 'win':
+ editor = 'notepad'
+ else:
+ # Favor command-line editors first so we don't leave the terminal to edit
+ for editor in ['vim', 'vi', 'emacs', 'nano', 'pico', 'gedit', 'kate', 'subl', 'geany', 'atom']:
+ if which(editor):
+ break
+ return editor
+
+
class StdSim(object):
"""
Class to simulate behavior of sys.stdout or sys.stderr.
@@ -315,7 +341,14 @@ class StdSim(object):
def clear(self) -> None:
"""Clear the internal contents"""
- self.buffer.byte_buf = b''
+ self.buffer.byte_buf = bytearray()
+
+ def isatty(self) -> bool:
+ """StdSim only considered an interactive stream if `echo` is True and `inner_stream` is a tty."""
+ if self.echo:
+ return self.inner_stream.isatty()
+ else:
+ return False
def __getattr__(self, item: str):
if item in self.__dict__:
@@ -329,7 +362,7 @@ class ByteBuf(object):
Used by StdSim to write binary data and stores the actual bytes written
"""
def __init__(self, std_sim_instance: StdSim) -> None:
- self.byte_buf = b''
+ self.byte_buf = bytearray()
self.std_sim_instance = std_sim_instance
def write(self, b: bytes) -> None:
@@ -342,27 +375,140 @@ class ByteBuf(object):
self.std_sim_instance.inner_stream.buffer.write(b)
-def unquote_redirection_tokens(args: List[str]) -> None:
+class ProcReader(object):
"""
- Unquote redirection tokens in a list of command-line arguments
- This is used when redirection tokens have to be passed to another command
- :param args: the command line args
+ Used to captured stdout and stderr from a Popen process if any of those were set to subprocess.PIPE.
+ If neither are pipes, then the process will run normally and no output will be captured.
"""
- for i, arg in enumerate(args):
- unquoted_arg = strip_quotes(arg)
- if unquoted_arg in constants.REDIRECTION_TOKENS:
- args[i] = unquoted_arg
+ def __init__(self, proc: subprocess.Popen, stdout: Union[StdSim, BinaryIO, TextIO],
+ stderr: Union[StdSim, BinaryIO, TextIO]) -> None:
+ """
+ ProcReader initializer
+ :param proc: the Popen process being read from
+ :param stdout: the stream to write captured stdout
+ :param stderr: the stream to write captured stderr
+ """
+ self._proc = proc
+ self._stdout = stdout
+ self._stderr = stderr
+
+ self._out_thread = threading.Thread(name='out_thread', target=self._reader_thread_func,
+ kwargs={'read_stdout': True})
+
+ self._err_thread = threading.Thread(name='out_thread', target=self._reader_thread_func,
+ kwargs={'read_stdout': False})
+
+ # Start the reader threads for pipes only
+ if self._proc.stdout is not None:
+ self._out_thread.start()
+ if self._proc.stderr is not None:
+ self._err_thread.start()
+
+ def send_sigint(self) -> None:
+ """Send a SIGINT to the process similar to if <Ctrl>+C were pressed."""
+ import signal
+ if sys.platform.startswith('win'):
+ signal_to_send = signal.CTRL_C_EVENT
+ else:
+ signal_to_send = signal.SIGINT
+ self._proc.send_signal(signal_to_send)
+
+ def terminate(self) -> None:
+ """Terminate the process"""
+ self._proc.terminate()
+
+ def wait(self) -> None:
+ """Wait for the process to finish"""
+ if self._out_thread.is_alive():
+ self._out_thread.join()
+ if self._err_thread.is_alive():
+ self._err_thread.join()
+
+ # Handle case where the process ended before the last read could be done.
+ # This will return None for the streams that weren't pipes.
+ out, err = self._proc.communicate()
+
+ if out:
+ self._write_bytes(self._stdout, out)
+ if err:
+ self._write_bytes(self._stderr, err)
+
+ def _reader_thread_func(self, read_stdout: bool) -> None:
+ """
+ Thread function that reads a stream from the process
+ :param read_stdout: if True, then this thread deals with stdout. Otherwise it deals with stderr.
+ """
+ if read_stdout:
+ read_stream = self._proc.stdout
+ write_stream = self._stdout
+ else:
+ read_stream = self._proc.stderr
+ write_stream = self._stderr
+
+ # The thread should have been started only if this stream was a pipe
+ assert read_stream is not None
+
+ # Run until process completes
+ while self._proc.poll() is None:
+ # noinspection PyUnresolvedReferences
+ available = read_stream.peek()
+ if available:
+ read_stream.read(len(available))
+ self._write_bytes(write_stream, available)
+
+ @staticmethod
+ def _write_bytes(stream: Union[StdSim, BinaryIO, TextIO], to_write: bytes) -> None:
+ """
+ Write bytes to a stream
+ :param stream: the stream being written to
+ :param to_write: the bytes being written
+ """
+ try:
+ if hasattr(stream, 'buffer'):
+ stream.buffer.write(to_write)
+ else:
+ stream.write(to_write)
+ except BrokenPipeError:
+ # This occurs if output is being piped to a process that closed
+ pass
-def find_editor() -> str:
- """Find a reasonable editor to use by default for the system that the cmd2 application is running on."""
- editor = os.environ.get('EDITOR')
- if not editor:
- if sys.platform[:3] == 'win':
- editor = 'notepad'
- else:
- # Favor command-line editors first so we don't leave the terminal to edit
- for editor in ['vim', 'vi', 'emacs', 'nano', 'pico', 'gedit', 'kate', 'subl', 'geany', 'atom']:
- if which(editor):
- break
- return editor
+class ContextFlag(object):
+ """A context manager which is also used as a boolean flag value within the default sigint handler.
+
+ Its main use is as a flag to prevent the SIGINT handler in cmd2 from raising a KeyboardInterrupt
+ while a critical code section has set the flag to True. Because signal handling is always done on the
+ main thread, this class is not thread-safe since there is no need.
+ """
+ def __init__(self) -> None:
+ # When this flag has a positive value, it is considered set.
+ # When it is 0, it is not set. It should never go below 0.
+ self.__count = 0
+
+ def __bool__(self) -> bool:
+ return self.__count > 0
+
+ def __enter__(self) -> None:
+ self.__count += 1
+
+ def __exit__(self, *args) -> None:
+ self.__count -= 1
+ if self.__count < 0:
+ raise ValueError("count has gone below 0")
+
+
+class RedirectionSavedState(object):
+ """Created by each command to store information about their redirection."""
+
+ def __init__(self, self_stdout: Union[StdSim, BinaryIO, TextIO], sys_stdout: Union[StdSim, BinaryIO, TextIO],
+ pipe_proc_reader: Optional[ProcReader]) -> None:
+ # Used to restore values after the command ends
+ self.saved_self_stdout = self_stdout
+ self.saved_sys_stdout = sys_stdout
+ self.saved_pipe_proc_reader = pipe_proc_reader
+
+ # Tells if the command is redirecting
+ self.redirecting = False
+
+ # If the command created a process to pipe to, then then is its reader
+ self.pipe_proc_reader = None