diff options
Diffstat (limited to 'tests/test_misc.py')
-rwxr-xr-x | tests/test_misc.py | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/tests/test_misc.py b/tests/test_misc.py index d5a707c..e439240 100755 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -24,6 +24,7 @@ import re import signal import time import tempfile +import threading import os import pexpect @@ -41,6 +42,42 @@ else: return s +class DiscardOutput(threading.Thread): + def __init__(self, child): + self.child = child + threading.Thread.__init__(self) + + def run(self): + # discard all output, expecting to program exit + self.child.expect(pexpect.EOF) + + +class TestCaseBlockingWrite(PexpectTestCase.PexpectTestCase): + BASH_SCRIPT=('n=0; while [ $n -lt 10000 ]; do echo $n; ' + 'let n="$n + 1"; done; echo END > {0}; exit') + + def test_discard_output_using_background_thread(self): + " Test that a background thread can discard output." + # this is slightly contrary to the recommendation in commonissues.rst + # of section "Threads" regarding whether a background thread can read + # from a file descriptor created in a main thread -- presumably only + # an issue in RHEL 8. This test case matches the first recommendation + # of section, "Blocking Child" of the same document. + bash = pexpect.spawn('bash', echo=False) + result_file = tempfile.mktemp() + try: + bash.sendline(self.BASH_SCRIPT.format(result_file)) + bash.expect(re.compile('\s100\s'), timeout=2) + thread = DiscardOutput(bash) + thread.start() + thread.join() + assert os.path.exists(result_file) + assert open(result_file).read().strip() == 'END' + finally: + if os.path.exists(result_file): + os.unlink(result_file) + + class TestCaseMisc(PexpectTestCase.PexpectTestCase): def test_isatty(self): |