diff options
-rw-r--r-- | CHANGELOG.md | 3 | ||||
-rw-r--r-- | cmd2/utils.py | 39 | ||||
-rw-r--r-- | tests/test_utils.py | 31 |
3 files changed, 52 insertions, 21 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index c67fee2c..561ddcee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ * Enhancements * `pyscript` limits a command's stdout capture to the same period that redirection does. Therefore output from a command's postparsing and finalization hooks isn't saved in the StdSim object. + * `StdSim.buffer.write()` now flushes when the wrapped stream uses line buffering and the bytes being written + contain a newline or carriage return. This helps when `pyscript` is echoing the output of a shell command + since the output will print at the same frequency as when the command is run in a terminal. ## 0.9.12 (April 22, 2019) * Bug Fixes diff --git a/cmd2/utils.py b/cmd2/utils.py index 35c953e0..44a58c35 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -8,7 +8,7 @@ import subprocess import sys import threading import unicodedata -from typing import Any, BinaryIO, Iterable, List, Optional, TextIO, Union +from typing import Any, Iterable, List, Optional, TextIO, Union from wcwidth import wcswidth @@ -297,7 +297,7 @@ class StdSim(object): encoding: str = 'utf-8', errors: str = 'replace') -> None: """ Initializer - :param inner_stream: the emulated stream + :param inner_stream: the wrapped stream. Should be a TextIO or StdSim instance. :param echo: if True, then all input will be echoed to inner_stream :param encoding: codec for encoding/decoding strings (defaults to utf-8) :param errors: how to handle encoding/decoding errors (defaults to replace) @@ -350,6 +350,17 @@ class StdSim(object): else: return False + @property + def line_buffering(self) -> bool: + """ + Handle when the inner stream doesn't have a line_buffering attribute which is the case + when running unit tests because pytest sets stdout to a pytest EncodedFile object. + """ + try: + return self.inner_stream.line_buffering + except AttributeError: + return False + def __getattr__(self, item: str): if item in self.__dict__: return self.__dict__[item] @@ -361,6 +372,9 @@ class ByteBuf(object): """ Used by StdSim to write binary data and stores the actual bytes written """ + # Used to know when to flush the StdSim + NEWLINES = [b'\n', b'\r'] + def __init__(self, std_sim_instance: StdSim) -> None: self.byte_buf = bytearray() self.std_sim_instance = std_sim_instance @@ -374,14 +388,22 @@ class ByteBuf(object): if self.std_sim_instance.echo: self.std_sim_instance.inner_stream.buffer.write(b) + # Since StdSim wraps TextIO streams, we will flush the stream if line buffering is on + # and the bytes being written contain a new line character. This is helpful when StdSim + # is being used to capture output of a shell command because it causes the output to print + # to the screen more often than if we waited for the stream to flush its buffer. + if self.std_sim_instance.line_buffering: + if any(newline in b for newline in ByteBuf.NEWLINES): + self.std_sim_instance.flush() + class ProcReader(object): """ Used to captured stdout and stderr from a Popen process if any of those were set to subprocess.PIPE. If neither are pipes, then the process will run normally and no output will be captured. """ - def __init__(self, proc: subprocess.Popen, stdout: Union[StdSim, BinaryIO, TextIO], - stderr: Union[StdSim, BinaryIO, TextIO]) -> None: + def __init__(self, proc: subprocess.Popen, stdout: Union[StdSim, TextIO], + stderr: Union[StdSim, TextIO]) -> None: """ ProcReader initializer :param proc: the Popen process being read from @@ -457,17 +479,14 @@ class ProcReader(object): self._write_bytes(write_stream, available) @staticmethod - def _write_bytes(stream: Union[StdSim, BinaryIO, TextIO], to_write: bytes) -> None: + def _write_bytes(stream: Union[StdSim, TextIO], to_write: bytes) -> None: """ Write bytes to a stream :param stream: the stream being written to :param to_write: the bytes being written """ try: - if hasattr(stream, 'buffer'): - stream.buffer.write(to_write) - else: - stream.write(to_write) + stream.buffer.write(to_write) except BrokenPipeError: # This occurs if output is being piped to a process that closed pass @@ -500,7 +519,7 @@ class ContextFlag(object): class RedirectionSavedState(object): """Created by each command to store information about their redirection.""" - def __init__(self, self_stdout: Union[StdSim, BinaryIO, TextIO], sys_stdout: Union[StdSim, BinaryIO, TextIO], + def __init__(self, self_stdout: Union[StdSim, TextIO], sys_stdout: Union[StdSim, TextIO], pipe_proc_reader: Optional[ProcReader]) -> None: # Used to restore values after the command ends self.saved_self_stdout = self_stdout diff --git a/tests/test_utils.py b/tests/test_utils.py index 4574e579..c0b16990 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -126,23 +126,12 @@ def stdout_sim(): stdsim = cu.StdSim(sys.stdout, echo=True) return stdsim -@pytest.fixture -def stringio_sim(): - import io - stdsim = cu.StdSim(io.StringIO(), echo=True) - return stdsim - def test_stdsim_write_str(stdout_sim): my_str = 'Hello World' stdout_sim.write(my_str) assert stdout_sim.getvalue() == my_str -def test_stdsim_write_str_inner_no_buffer(stringio_sim): - my_str = 'Hello World' - stringio_sim.write(my_str) - assert stringio_sim.getvalue() == my_str - def test_stdsim_write_bytes(stdout_sim): b_str = b'Hello World' with pytest.raises(TypeError): @@ -218,6 +207,26 @@ def test_stdsim_pause_storage(stdout_sim): stdout_sim.buffer.write(b_str) assert stdout_sim.getbytes() == b'' +def test_stdsim_line_buffering(base_app): + # This exercises the case of writing binary data that contains new lines/carriage returns to a StdSim + # when line buffering is on. The output should immediately be flushed to the underlying stream. + import os + import tempfile + file = tempfile.NamedTemporaryFile(mode='wt') + file.line_buffering = True + + stdsim = cu.StdSim(file, echo=True) + saved_size = os.path.getsize(file.name) + + bytes_to_write = b'hello\n' + stdsim.buffer.write(bytes_to_write) + assert os.path.getsize(file.name) == saved_size + len(bytes_to_write) + saved_size = os.path.getsize(file.name) + + bytes_to_write = b'hello\r' + stdsim.buffer.write(bytes_to_write) + assert os.path.getsize(file.name) == saved_size + len(bytes_to_write) + @pytest.fixture def pr_none(): |