diff options
| author | Ted Ross <tross@apache.org> | 2012-06-12 19:06:13 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2012-06-12 19:06:13 +0000 |
| commit | 1e27038d5cd6a2681eb4cea23fe4e46ddf4d4dd7 (patch) | |
| tree | ee0b6e1bbce0ed2d39d560b75705551e40cebfae | |
| parent | 53e613ed99692945b68d389b0b32c66bfb2d2ac5 (diff) | |
| download | qpid-python-1e27038d5cd6a2681eb4cea23fe4e46ddf4d4dd7.tar.gz | |
QPID-4059 - qpid-printevents refactored to use the lighter-weight management library
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349476 13f79535-47bb-0310-9956-ffa450edef68
| -rwxr-xr-x | qpid/tools/src/py/qpid-printevents | 113 | ||||
| -rw-r--r-- | qpid/tools/src/py/qpidtoollibs/broker.py | 38 |
2 files changed, 123 insertions, 28 deletions
diff --git a/qpid/tools/src/py/qpid-printevents b/qpid/tools/src/py/qpid-printevents index d56d2899b1..7c3e2b6c23 100755 --- a/qpid/tools/src/py/qpid-printevents +++ b/qpid/tools/src/py/qpid-printevents @@ -21,34 +21,85 @@ import os import optparse -from optparse import IndentedHelpFormatter import sys -import socket -from time import time, strftime, gmtime, sleep -from qmf.console import Console, Session +from optparse import IndentedHelpFormatter +from time import time, strftime, gmtime, sleep +from threading import Lock, Condition, Thread +from qpid.messaging import Connection +import qpid.messaging.exceptions +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) -class EventConsole(Console): - def event(self, broker, event): - print event - sys.stdout.flush() +from qpidtoollibs.broker import EventHelper - def brokerConnected(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl() - sys.stdout.flush() - def brokerConnectionFailed(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc)) - sys.stdout.flush() +class Printer(object): + """ + This class serializes printed lines so that events coming from different + threads don't overlap each other. + """ + def __init__(self): + self.lock = Lock() - def brokerDisconnected(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl() + def pr(self, text): + self.lock.acquire() + try: + print text + finally: + self.lock.release() sys.stdout.flush() + + +class EventReceiver(Thread): + """ + One instance of this class is created for each broker that is being monitored. + This class does not use the "reconnect" option because it needs to report as + events when the connection is established and when it's lost. + """ + def __init__(self, printer, url, mechanism, options): + Thread.__init__(self) + self.printer = printer + self.url = url + self.mechanism = mechanism + self.options = options + self.running = True + self.helper = EventHelper() + + def cancel(self): + self.running = False + + def run(self): + isOpen = False + while self.running: + try: + conn = Connection.establish(self.url, sasl_mechanisms=self.mechanism, client_properties=self.options) + isOpen = True + self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerConnected broker=%s" % self.url) + + sess = conn.session() + rx = sess.receiver(self.helper.eventAddress()) + + while self.running: + try: + msg = rx.fetch(1) + event = self.helper.event(msg) + self.printer.pr(event.__repr__()) + sess.acknowledge() + except qpid.messaging.exceptions.Empty: + pass + + except Exception, e: + if isOpen: + self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerDisconnected broker=%s" % self.url) + isOpen = False + sleep(1) + class JHelpFormatter(IndentedHelpFormatter): - """Format usage and description without stripping newlines from usage strings """ - + Format usage and description without stripping newlines from usage strings + """ def format_usage(self, usage): return usage @@ -87,16 +138,23 @@ def main(argv=None): if len(arguments) == 0: arguments.append("localhost") - console = EventConsole() - session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True) - brokers = [] + brokers = [] + mechanism = options.sasl_mechanism + props = {'qpid.ha-admin' : 1} + printer = Printer() + + if options.heartbeats: + props['heartbeat'] = 5 + try: try: for host in arguments: - brokers.append(session.addBroker(host, None, options.sasl_mechanism)) + er = EventReceiver(printer, host, mechanism, props) + brokers.append(er) + er.start() - while (True): - sleep(10) + while (True): + sleep(10) except KeyboardInterrupt: print @@ -106,9 +164,10 @@ def main(argv=None): print "Failed: %s - %s" % (e.__class__.__name__, e) return 1 finally: - while len(brokers): - b = brokers.pop() - session.delBroker(b) + for b in brokers: + b.cancel() + for b in brokers: + b.join() if __name__ == '__main__': sys.exit(main()) diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py index 0bae786306..840db05795 100644 --- a/qpid/tools/src/py/qpidtoollibs/broker.py +++ b/qpid/tools/src/py/qpidtoollibs/broker.py @@ -18,6 +18,7 @@ # from qpid.messaging import Message +from qpidtoollibs.disp import TimeLong try: from uuid import uuid4 except ImportError: @@ -295,6 +296,41 @@ class BrokerAgent(object): return self._getBrokerObject(self, _type, oid) +class EventHelper(object): + def eventAddress(self, pkg='*', cls='*', sev='*'): + return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev) + + def event(self, msg): + return BrokerEvent(msg) + + +class BrokerEvent(object): + def __init__(self, msg): + self.msg = msg + self.content = msg.content[0] + self.values = self.content['_values'] + self.schema_id = self.content['_schema_id'] + self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name']) + + def __repr__(self): + rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name) + for k,v in self.values.items(): + rep = rep + " %s=%s" % (k, v) + return rep + + def __getattr__(self, key): + if key not in self.values: + return None + value = self.values[key] + return value + + def getAttributes(self): + return self.values + + def getTimestamp(self): + return self.content['_timestamp'] + + class BrokerObject(object): def __init__(self, broker, content): self.broker = broker @@ -362,7 +398,7 @@ class Connection(BrokerObject): BrokerObject.__init__(self, broker, values) def close(self): - pass + self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) class Session(BrokerObject): def __init__(self, broker, values): |
