summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Pursehouse <david.pursehouse@sonymobile.com>2013-09-12 15:24:34 +0900
committerDavid Pursehouse <david.pursehouse@sonymobile.com>2013-09-12 18:06:10 +0900
commit7fc93f60775f252278af535ade9a7c6cae17a3d1 (patch)
tree4bae69d448608c4eddff1fc65eacad7d07cc8d15
parent6c345c62582769be8394d0a731461a7371aabff2 (diff)
downloadpygerrit-7fc93f60775f252278af535ade9a7c6cae17a3d1.tar.gz
Completely refactor the stream event handling
Instead of using select.select() to wait for input, use the methods provided by paramiko's client channel object. Read from both stdout and stderr, and send ErrorEvent if anything is received on stderr. Ref #1 Fix #12 Change-Id: Ic67392cb9d689a1f457f5d02eb17a5432112a0c0
-rw-r--r--pygerrit/events.py5
-rw-r--r--pygerrit/stream.py36
2 files changed, 18 insertions, 23 deletions
diff --git a/pygerrit/events.py b/pygerrit/events.py
index 9cf4d23..ebbb327 100644
--- a/pygerrit/events.py
+++ b/pygerrit/events.py
@@ -70,7 +70,7 @@ class GerritEventFactory(object):
json_data = json.loads(data)
except ValueError as err:
logging.debug("Failed to load json data: %s: [%s]", str(err), data)
- json_data = ErrorEvent.error_json(str(err))
+ json_data = json.loads(ErrorEvent.error_json(err))
if not "type" in json_data:
raise GerritError("`type` not in json_data")
@@ -117,9 +117,8 @@ class ErrorEvent(GerritEvent):
@classmethod
def error_json(cls, error):
""" Return a json string for the `error`. """
- data = '{"type":"error-event",' \
+ return '{"type":"error-event",' \
'"error":"%s"}' % str(error)
- return json.loads(data)
def __repr__(self):
return u"<ErrorEvent: %s>" % self.error
diff --git a/pygerrit/stream.py b/pygerrit/stream.py
index dcf57ba..37b09f1 100644
--- a/pygerrit/stream.py
+++ b/pygerrit/stream.py
@@ -26,11 +26,8 @@ Class to listen to the Gerrit event stream and dispatch events.
"""
-import logging
-from select import select
from threading import Thread, Event
-from .error import GerritError
from .events import ErrorEvent
@@ -51,23 +48,22 @@ class GerritStream(Thread):
def _error_event(self, error):
""" Dispatch `error` to the Gerrit client. """
- self._gerrit.put_event(ErrorEvent.error_json(str(error)))
+ self._gerrit.put_event(ErrorEvent.error_json(error))
def run(self):
""" Listen to the stream and send events to the client. """
- try:
- result = self._ssh_client.run_gerrit_command("stream-events")
- except GerritError as err:
- self._error_event(err)
- else:
- stdout = result.stdout
- inputready, _outputready, _exceptready = \
- select([stdout.channel], [], [])
- while not self._stop.is_set():
- for _event in inputready:
- try:
- self._gerrit.put_event(stdout.readline())
- except IOError as err:
- self._error_event(err)
- except GerritError as err:
- logging.error("Failed to put event: %s", err)
+ channel = self._ssh_client.get_transport().open_session()
+ channel.exec_command("gerrit stream-events")
+ stdout = channel.makefile()
+ stderr = channel.makefile_stderr()
+ while not self._stop.is_set():
+ if channel.exit_status_ready():
+ if channel.recv_stderr_ready():
+ error = stderr.readline().strip()
+ else:
+ error = "Remote server connection closed"
+ self._error_event(error)
+ self._stop.set()
+ elif channel.recv_ready():
+ data = stdout.readline()
+ self._gerrit.put_event(data)