summaryrefslogtreecommitdiff
path: root/test/service.py
blob: ea29c334e6bacf97ba3c766665b20abcbfe7e67b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
import os
import re
import select
import subprocess
import threading
import time

__all__ = [
    'ExternalService',
    'SpawnedService',

]


log = logging.getLogger(__name__)


class ExternalService(object):
    def __init__(self, host, port):
        log.info("Using already running service at %s:%d", host, port)
        self.host = host
        self.port = port

    def open(self):
        pass

    def close(self):
        pass


class SpawnedService(threading.Thread):
    def __init__(self, args=None, env=None):
        threading.Thread.__init__(self)

        if args is None:
            raise TypeError("args parameter is required")
        self.args = args
        self.env = env
        self.captured_stdout = []
        self.captured_stderr = []

        self.should_die = threading.Event()
        self.child = None
        self.alive = False

    def run(self):
        self.run_with_handles()

    def _spawn(self):
        if self.alive: return
        if self.child and self.child.poll() is None: return

        self.child = subprocess.Popen(
            self.args,
            preexec_fn=os.setsid, # to avoid propagating signals
            env=self.env,
            bufsize=1,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        self.alive = True

    def _despawn(self):
        if self.child.poll() is None:
            self.child.terminate()
        self.alive = False
        for _ in range(50):
            if self.child.poll() is not None:
                self.child = None
                break
            time.sleep(0.1)
        else:
            self.child.kill()

    def run_with_handles(self):
        self._spawn()
        while True:
            (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1)

            if self.child.stdout in rds:
                line = self.child.stdout.readline()
                self.captured_stdout.append(line.decode('utf-8'))

            if self.child.stderr in rds:
                line = self.child.stderr.readline()
                self.captured_stderr.append(line.decode('utf-8'))

            if self.child.poll() is not None:
                self.dump_logs()
                self._spawn()

            if self.should_die.is_set():
                self._despawn()
                break

    def dump_logs(self):
        log.critical('stderr')
        for line in self.captured_stderr:
            log.critical(line.rstrip())

        log.critical('stdout')
        for line in self.captured_stdout:
            log.critical(line.rstrip())

    def wait_for(self, pattern, timeout=30):
        t1 = time.time()
        while True:
            t2 = time.time()
            if t2 - t1 >= timeout:
                try:
                    self.child.kill()
                except:
                    log.exception("Received exception when killing child process")
                self.dump_logs()

                log.error("Waiting for %r timed out after %d seconds", pattern, timeout)
                return False

            if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
                log.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1))
                return True
            if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
                log.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1))
                return True
            time.sleep(0.1)

    def start(self):
        threading.Thread.start(self)

    def stop(self):
        self.should_die.set()
        self.join()