diff options
| author | David Pursehouse <david.pursehouse@sonymobile.com> | 2012-08-24 13:58:33 +0900 |
|---|---|---|
| committer | David Pursehouse <david.pursehouse@sonymobile.com> | 2012-09-11 13:28:51 +0900 |
| commit | d8db31b4e99afef93534c84254e49036b348b905 (patch) | |
| tree | 53ec5323abcdc025d26a93ded3112981194e1820 /pygerrit/stream.py | |
| parent | ce372a4808dfaad89cad6f74d7c618ae2c5a2111 (diff) | |
| download | pygerrit-d8db31b4e99afef93534c84254e49036b348b905.tar.gz | |
Refactor event stream handling to use SSH client
Change-Id: I4953be92719fddeb7c23c2559ffe954c452cc131
Diffstat (limited to 'pygerrit/stream.py')
| -rw-r--r-- | pygerrit/stream.py | 95 |
1 files changed, 29 insertions, 66 deletions
diff --git a/pygerrit/stream.py b/pygerrit/stream.py index 9b339d1..7026213 100644 --- a/pygerrit/stream.py +++ b/pygerrit/stream.py @@ -5,75 +5,38 @@ Class to listen to the Gerrit event stream and dispatch events. """ import json -import logging +from select import poll, POLLIN +from threading import Thread, Event -from pygerrit.error import GerritError -from pygerrit.events import GerritEventFactory +from pygerrit.ssh import GerritSSHClient -class GerritStreamError(Exception): - - """ Raised when an error occurs while reading the Gerrit events stream. """ - - pass - - -class GerritStream(object): +class GerritStream(Thread): """ Gerrit events stream handler. """ - def __init__(self, stream): - self.listeners = [] - self.stream = stream - - def attach(self, listener): - """ Attach the `listener` to the list of listeners. - - Raise GerritStreamError if the listener does not match the - expected signature, or if its event handler is not callable. - - """ - if not hasattr(listener, "on_gerrit_event"): - raise GerritStreamError("Listener must have `on_gerrit_event` " - "event handler method") - if not callable(listener.on_gerrit_event): - raise GerritStreamError("`on_gerrit_event` must be callable") - if not listener.on_gerrit_event.func_code.co_argcount == 2: - raise GerritStreamError("`on_gerrit_event` must take 1 arg") - if not listener in self.listeners: - self.listeners.append(listener) - - def detach(self, listener): - """ Remove the `listener` from the list of listeners. """ - if listener in self.listeners: - try: - self.listeners.remove(listener) - except ValueError: - pass - - def read(self): - """ Read lines of JSON data from `stream` and dispatch events. - - For each line read from `stream`, until EOF, parse the line as - JSON data, instantiate the corresponding GerritEvent, and dispatch it - to the listeners. - - Raise GerritStreamError on any errors. - - """ - try: - while 1: - line = self.stream.readline() - if not line: - break - json_data = json.loads(line) - try: - event = GerritEventFactory.create(json_data) - for listener in self.listeners: - listener.on_gerrit_event(event) - except GerritError, e: - logging.error("Unable to dispatch event: %s", e) - except (IOError, AttributeError), e: - raise GerritStreamError("Error reading event stream: %s" % e) - except ValueError, e: - raise GerritStreamError("Invalid JSON data in event stream: %s" % e) + def __init__(self, gerrit, host): + Thread.__init__(self) + self.daemon = True + self._gerrit = gerrit + self._host = host + self._stop = Event() + + def stop(self): + """ Stop the thread. """ + self._stop.set() + + def run(self): + """ Listen to the stream and send events to the client. """ + client = GerritSSHClient(self._host) + _stdin, stdout, _stderr = client.run_gerrit_command("stream-events") + p = poll() + p.register(stdout.channel) + while not self._stop.is_set(): + data = p.poll() + for (fd, event) in data: + if fd == stdout.channel.fileno(): + if event == POLLIN: + line = stdout.readline() + json_data = json.loads(line) + self._gerrit.put_event(json_data) |
