1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
import logging
import os
import re
import select
import subprocess
import threading
import time
__all__ = [
'ExternalService',
'SpawnedService',
]
log = logging.getLogger(__name__)
class ExternalService(object):
def __init__(self, host, port):
log.info("Using already running service at %s:%d", host, port)
self.host = host
self.port = port
def open(self):
pass
def close(self):
pass
class SpawnedService(threading.Thread):
def __init__(self, args=None, env=None):
threading.Thread.__init__(self)
if args is None:
raise TypeError("args parameter is required")
self.args = args
self.env = env
self.captured_stdout = []
self.captured_stderr = []
self.should_die = threading.Event()
self.child = None
self.alive = False
def run(self):
self.run_with_handles()
def _spawn(self):
if self.alive: return
if self.child and self.child.poll() is None: return
self.child = subprocess.Popen(
self.args,
preexec_fn=os.setsid, # to avoid propagating signals
env=self.env,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.alive = True
def _despawn(self):
if self.child.poll() is None:
self.child.terminate()
self.alive = False
for _ in range(50):
if self.child.poll() is not None:
self.child = None
break
time.sleep(0.1)
else:
self.child.kill()
def run_with_handles(self):
self._spawn()
while True:
(rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1)
if self.child.stdout in rds:
line = self.child.stdout.readline()
self.captured_stdout.append(line.decode('utf-8'))
if self.child.stderr in rds:
line = self.child.stderr.readline()
self.captured_stderr.append(line.decode('utf-8'))
if self.child.poll() is not None:
self.dump_logs()
self._spawn()
if self.should_die.is_set():
self._despawn()
break
def dump_logs(self):
log.critical('stderr')
for line in self.captured_stderr:
log.critical(line.rstrip())
log.critical('stdout')
for line in self.captured_stdout:
log.critical(line.rstrip())
def wait_for(self, pattern, timeout=30):
t1 = time.time()
while True:
t2 = time.time()
if t2 - t1 >= timeout:
try:
self.child.kill()
except:
log.exception("Received exception when killing child process")
self.dump_logs()
log.error("Waiting for %r timed out after %d seconds", pattern, timeout)
return False
if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
log.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1))
return True
if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
log.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1))
return True
time.sleep(0.1)
def start(self):
threading.Thread.start(self)
def stop(self):
self.should_die.set()
self.join()
|