summaryrefslogtreecommitdiff
path: root/test/service.py
blob: 1b95cbce7397018f79b5ebb6acb3b3be358033e8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import re
import select
import subprocess
import sys
import threading
import time

__all__ = [
    'ExternalService',
    'SpawnedService',

]

class ExternalService(object):
    def __init__(self, host, port):
        print("Using already running service at %s:%d" % (host, port))
        self.host = host
        self.port = port

    def open(self):
        pass

    def close(self):
        pass


class SpawnedService(threading.Thread):
    def __init__(self, args=[]):
        threading.Thread.__init__(self)

        self.args = args
        self.captured_stdout = ""
        self.captured_stderr = ""
        self.stdout_file = None
        self.stderr_file = None
        self.capture_stdout = True
        self.capture_stderr = True
        self.show_stdout = True
        self.show_stderr = True

        self.should_die = threading.Event()

    def configure_stdout(self, file=None, capture=True, show=False):
        self.stdout_file = file
        self.capture_stdout = capture
        self.show_stdout = show

    def configure_stderr(self, file=None, capture=False, show=False):
        self.stderr_file = file
        self.capture_stderr = capture
        self.show_stderr = show

    def run(self):
        stdout_handle = None
        stderr_handle = None
        try:
            if self.stdout_file:
                stdout_handle = open(self.stdout_file, "w")
            if self.stderr_file:
                stderr_handle = open(self.stderr_file, "w")
            self.run_with_handles(stdout_handle, stderr_handle)
        finally:
            if stdout_handle:
                stdout_handle.close()
            if stderr_handle:
                stderr_handle.close()

    def run_with_handles(self, stdout_handle, stderr_handle):
        child = subprocess.Popen(
            self.args,
            bufsize=1,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        alive = True

        while True:
            (rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)

            if child.stdout in rds:
                line = child.stdout.readline()
                if stdout_handle:
                    stdout_handle.write(line)
                    stdout_handle.flush()
                if self.capture_stdout:
                    self.captured_stdout += line
                if self.show_stdout:
                    sys.stdout.write(line)
                    sys.stdout.flush()

            if child.stderr in rds:
                line = child.stderr.readline()
                if stderr_handle:
                    stderr_handle.write(line)
                    stderr_handle.flush()
                if self.capture_stderr:
                    self.captured_stderr += line
                if self.show_stderr:
                    sys.stderr.write(line)
                    sys.stderr.flush()

            if self.should_die.is_set():
                child.terminate()
                alive = False

            if child.poll() is not None:
                if not alive:
                    break
                else:
                    raise RuntimeError("Subprocess has died. Aborting.")

    def wait_for(self, pattern, timeout=10):
        t1 = time.time()
        while True:
            t2 = time.time()
            if t2 - t1 >= timeout:
                raise RuntimeError("Waiting for %r timed out" % pattern)

            if re.search(pattern, self.captured_stdout, re.IGNORECASE) is not None:
                return
            if re.search(pattern, self.captured_stderr, re.IGNORECASE) is not None:
                return
            time.sleep(0.1)

    def start(self):
        threading.Thread.start(self)

    def stop(self):
        self.should_die.set()
        self.join()