summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md3
-rw-r--r--cmd2/utils.py39
-rw-r--r--tests/test_utils.py31
3 files changed, 52 insertions, 21 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c67fee2c..561ddcee 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,9 @@
* Enhancements
* `pyscript` limits a command's stdout capture to the same period that redirection does.
Therefore output from a command's postparsing and finalization hooks isn't saved in the StdSim object.
+ * `StdSim.buffer.write()` now flushes when the wrapped stream uses line buffering and the bytes being written
+ contain a newline or carriage return. This helps when `pyscript` is echoing the output of a shell command
+ since the output will print at the same frequency as when the command is run in a terminal.
## 0.9.12 (April 22, 2019)
* Bug Fixes
diff --git a/cmd2/utils.py b/cmd2/utils.py
index 35c953e0..44a58c35 100644
--- a/cmd2/utils.py
+++ b/cmd2/utils.py
@@ -8,7 +8,7 @@ import subprocess
import sys
import threading
import unicodedata
-from typing import Any, BinaryIO, Iterable, List, Optional, TextIO, Union
+from typing import Any, Iterable, List, Optional, TextIO, Union
from wcwidth import wcswidth
@@ -297,7 +297,7 @@ class StdSim(object):
encoding: str = 'utf-8', errors: str = 'replace') -> None:
"""
Initializer
- :param inner_stream: the emulated stream
+ :param inner_stream: the wrapped stream. Should be a TextIO or StdSim instance.
:param echo: if True, then all input will be echoed to inner_stream
:param encoding: codec for encoding/decoding strings (defaults to utf-8)
:param errors: how to handle encoding/decoding errors (defaults to replace)
@@ -350,6 +350,17 @@ class StdSim(object):
else:
return False
+ @property
+ def line_buffering(self) -> bool:
+ """
+ Handle when the inner stream doesn't have a line_buffering attribute which is the case
+ when running unit tests because pytest sets stdout to a pytest EncodedFile object.
+ """
+ try:
+ return self.inner_stream.line_buffering
+ except AttributeError:
+ return False
+
def __getattr__(self, item: str):
if item in self.__dict__:
return self.__dict__[item]
@@ -361,6 +372,9 @@ class ByteBuf(object):
"""
Used by StdSim to write binary data and stores the actual bytes written
"""
+ # Used to know when to flush the StdSim
+ NEWLINES = [b'\n', b'\r']
+
def __init__(self, std_sim_instance: StdSim) -> None:
self.byte_buf = bytearray()
self.std_sim_instance = std_sim_instance
@@ -374,14 +388,22 @@ class ByteBuf(object):
if self.std_sim_instance.echo:
self.std_sim_instance.inner_stream.buffer.write(b)
+ # Since StdSim wraps TextIO streams, we will flush the stream if line buffering is on
+ # and the bytes being written contain a new line character. This is helpful when StdSim
+ # is being used to capture output of a shell command because it causes the output to print
+ # to the screen more often than if we waited for the stream to flush its buffer.
+ if self.std_sim_instance.line_buffering:
+ if any(newline in b for newline in ByteBuf.NEWLINES):
+ self.std_sim_instance.flush()
+
class ProcReader(object):
"""
Used to captured stdout and stderr from a Popen process if any of those were set to subprocess.PIPE.
If neither are pipes, then the process will run normally and no output will be captured.
"""
- def __init__(self, proc: subprocess.Popen, stdout: Union[StdSim, BinaryIO, TextIO],
- stderr: Union[StdSim, BinaryIO, TextIO]) -> None:
+ def __init__(self, proc: subprocess.Popen, stdout: Union[StdSim, TextIO],
+ stderr: Union[StdSim, TextIO]) -> None:
"""
ProcReader initializer
:param proc: the Popen process being read from
@@ -457,17 +479,14 @@ class ProcReader(object):
self._write_bytes(write_stream, available)
@staticmethod
- def _write_bytes(stream: Union[StdSim, BinaryIO, TextIO], to_write: bytes) -> None:
+ def _write_bytes(stream: Union[StdSim, TextIO], to_write: bytes) -> None:
"""
Write bytes to a stream
:param stream: the stream being written to
:param to_write: the bytes being written
"""
try:
- if hasattr(stream, 'buffer'):
- stream.buffer.write(to_write)
- else:
- stream.write(to_write)
+ stream.buffer.write(to_write)
except BrokenPipeError:
# This occurs if output is being piped to a process that closed
pass
@@ -500,7 +519,7 @@ class ContextFlag(object):
class RedirectionSavedState(object):
"""Created by each command to store information about their redirection."""
- def __init__(self, self_stdout: Union[StdSim, BinaryIO, TextIO], sys_stdout: Union[StdSim, BinaryIO, TextIO],
+ def __init__(self, self_stdout: Union[StdSim, TextIO], sys_stdout: Union[StdSim, TextIO],
pipe_proc_reader: Optional[ProcReader]) -> None:
# Used to restore values after the command ends
self.saved_self_stdout = self_stdout
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 4574e579..c0b16990 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -126,23 +126,12 @@ def stdout_sim():
stdsim = cu.StdSim(sys.stdout, echo=True)
return stdsim
-@pytest.fixture
-def stringio_sim():
- import io
- stdsim = cu.StdSim(io.StringIO(), echo=True)
- return stdsim
-
def test_stdsim_write_str(stdout_sim):
my_str = 'Hello World'
stdout_sim.write(my_str)
assert stdout_sim.getvalue() == my_str
-def test_stdsim_write_str_inner_no_buffer(stringio_sim):
- my_str = 'Hello World'
- stringio_sim.write(my_str)
- assert stringio_sim.getvalue() == my_str
-
def test_stdsim_write_bytes(stdout_sim):
b_str = b'Hello World'
with pytest.raises(TypeError):
@@ -218,6 +207,26 @@ def test_stdsim_pause_storage(stdout_sim):
stdout_sim.buffer.write(b_str)
assert stdout_sim.getbytes() == b''
+def test_stdsim_line_buffering(base_app):
+ # This exercises the case of writing binary data that contains new lines/carriage returns to a StdSim
+ # when line buffering is on. The output should immediately be flushed to the underlying stream.
+ import os
+ import tempfile
+ file = tempfile.NamedTemporaryFile(mode='wt')
+ file.line_buffering = True
+
+ stdsim = cu.StdSim(file, echo=True)
+ saved_size = os.path.getsize(file.name)
+
+ bytes_to_write = b'hello\n'
+ stdsim.buffer.write(bytes_to_write)
+ assert os.path.getsize(file.name) == saved_size + len(bytes_to_write)
+ saved_size = os.path.getsize(file.name)
+
+ bytes_to_write = b'hello\r'
+ stdsim.buffer.write(bytes_to_write)
+ assert os.path.getsize(file.name) == saved_size + len(bytes_to_write)
+
@pytest.fixture
def pr_none():