summaryrefslogtreecommitdiff
path: root/pygerrit/stream.py
diff options
context:
space:
mode:
authorDavid Pursehouse <david.pursehouse@sonymobile.com>2012-08-24 13:58:33 +0900
committerDavid Pursehouse <david.pursehouse@sonymobile.com>2012-09-11 13:28:51 +0900
commitd8db31b4e99afef93534c84254e49036b348b905 (patch)
tree53ec5323abcdc025d26a93ded3112981194e1820 /pygerrit/stream.py
parentce372a4808dfaad89cad6f74d7c618ae2c5a2111 (diff)
downloadpygerrit-d8db31b4e99afef93534c84254e49036b348b905.tar.gz
Refactor event stream handling to use SSH client
Change-Id: I4953be92719fddeb7c23c2559ffe954c452cc131
Diffstat (limited to 'pygerrit/stream.py')
-rw-r--r--pygerrit/stream.py95
1 files changed, 29 insertions, 66 deletions
diff --git a/pygerrit/stream.py b/pygerrit/stream.py
index 9b339d1..7026213 100644
--- a/pygerrit/stream.py
+++ b/pygerrit/stream.py
@@ -5,75 +5,38 @@ Class to listen to the Gerrit event stream and dispatch events.
"""
import json
-import logging
+from select import poll, POLLIN
+from threading import Thread, Event
-from pygerrit.error import GerritError
-from pygerrit.events import GerritEventFactory
+from pygerrit.ssh import GerritSSHClient
-class GerritStreamError(Exception):
-
- """ Raised when an error occurs while reading the Gerrit events stream. """
-
- pass
-
-
-class GerritStream(object):
+class GerritStream(Thread):
""" Gerrit events stream handler. """
- def __init__(self, stream):
- self.listeners = []
- self.stream = stream
-
- def attach(self, listener):
- """ Attach the `listener` to the list of listeners.
-
- Raise GerritStreamError if the listener does not match the
- expected signature, or if its event handler is not callable.
-
- """
- if not hasattr(listener, "on_gerrit_event"):
- raise GerritStreamError("Listener must have `on_gerrit_event` "
- "event handler method")
- if not callable(listener.on_gerrit_event):
- raise GerritStreamError("`on_gerrit_event` must be callable")
- if not listener.on_gerrit_event.func_code.co_argcount == 2:
- raise GerritStreamError("`on_gerrit_event` must take 1 arg")
- if not listener in self.listeners:
- self.listeners.append(listener)
-
- def detach(self, listener):
- """ Remove the `listener` from the list of listeners. """
- if listener in self.listeners:
- try:
- self.listeners.remove(listener)
- except ValueError:
- pass
-
- def read(self):
- """ Read lines of JSON data from `stream` and dispatch events.
-
- For each line read from `stream`, until EOF, parse the line as
- JSON data, instantiate the corresponding GerritEvent, and dispatch it
- to the listeners.
-
- Raise GerritStreamError on any errors.
-
- """
- try:
- while 1:
- line = self.stream.readline()
- if not line:
- break
- json_data = json.loads(line)
- try:
- event = GerritEventFactory.create(json_data)
- for listener in self.listeners:
- listener.on_gerrit_event(event)
- except GerritError, e:
- logging.error("Unable to dispatch event: %s", e)
- except (IOError, AttributeError), e:
- raise GerritStreamError("Error reading event stream: %s" % e)
- except ValueError, e:
- raise GerritStreamError("Invalid JSON data in event stream: %s" % e)
+ def __init__(self, gerrit, host):
+ Thread.__init__(self)
+ self.daemon = True
+ self._gerrit = gerrit
+ self._host = host
+ self._stop = Event()
+
+ def stop(self):
+ """ Stop the thread. """
+ self._stop.set()
+
+ def run(self):
+ """ Listen to the stream and send events to the client. """
+ client = GerritSSHClient(self._host)
+ _stdin, stdout, _stderr = client.run_gerrit_command("stream-events")
+ p = poll()
+ p.register(stdout.channel)
+ while not self._stop.is_set():
+ data = p.poll()
+ for (fd, event) in data:
+ if fd == stdout.channel.fileno():
+ if event == POLLIN:
+ line = stdout.readline()
+ json_data = json.loads(line)
+ self._gerrit.put_event(json_data)