summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2012-06-12 19:06:13 +0000
committerTed Ross <tross@apache.org>2012-06-12 19:06:13 +0000
commit1e27038d5cd6a2681eb4cea23fe4e46ddf4d4dd7 (patch)
treeee0b6e1bbce0ed2d39d560b75705551e40cebfae
parent53e613ed99692945b68d389b0b32c66bfb2d2ac5 (diff)
downloadqpid-python-1e27038d5cd6a2681eb4cea23fe4e46ddf4d4dd7.tar.gz
QPID-4059 - qpid-printevents refactored to use the lighter-weight management library
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349476 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/tools/src/py/qpid-printevents113
-rw-r--r--qpid/tools/src/py/qpidtoollibs/broker.py38
2 files changed, 123 insertions, 28 deletions
diff --git a/qpid/tools/src/py/qpid-printevents b/qpid/tools/src/py/qpid-printevents
index d56d2899b1..7c3e2b6c23 100755
--- a/qpid/tools/src/py/qpid-printevents
+++ b/qpid/tools/src/py/qpid-printevents
@@ -21,34 +21,85 @@
import os
import optparse
-from optparse import IndentedHelpFormatter
import sys
-import socket
-from time import time, strftime, gmtime, sleep
-from qmf.console import Console, Session
+from optparse import IndentedHelpFormatter
+from time import time, strftime, gmtime, sleep
+from threading import Lock, Condition, Thread
+from qpid.messaging import Connection
+import qpid.messaging.exceptions
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
-class EventConsole(Console):
- def event(self, broker, event):
- print event
- sys.stdout.flush()
+from qpidtoollibs.broker import EventHelper
- def brokerConnected(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
- sys.stdout.flush()
- def brokerConnectionFailed(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc))
- sys.stdout.flush()
+class Printer(object):
+ """
+ This class serializes printed lines so that events coming from different
+ threads don't overlap each other.
+ """
+ def __init__(self):
+ self.lock = Lock()
- def brokerDisconnected(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
+ def pr(self, text):
+ self.lock.acquire()
+ try:
+ print text
+ finally:
+ self.lock.release()
sys.stdout.flush()
+
+
+class EventReceiver(Thread):
+ """
+ One instance of this class is created for each broker that is being monitored.
+ This class does not use the "reconnect" option because it needs to report as
+ events when the connection is established and when it's lost.
+ """
+ def __init__(self, printer, url, mechanism, options):
+ Thread.__init__(self)
+ self.printer = printer
+ self.url = url
+ self.mechanism = mechanism
+ self.options = options
+ self.running = True
+ self.helper = EventHelper()
+
+ def cancel(self):
+ self.running = False
+
+ def run(self):
+ isOpen = False
+ while self.running:
+ try:
+ conn = Connection.establish(self.url, sasl_mechanisms=self.mechanism, client_properties=self.options)
+ isOpen = True
+ self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerConnected broker=%s" % self.url)
+
+ sess = conn.session()
+ rx = sess.receiver(self.helper.eventAddress())
+
+ while self.running:
+ try:
+ msg = rx.fetch(1)
+ event = self.helper.event(msg)
+ self.printer.pr(event.__repr__())
+ sess.acknowledge()
+ except qpid.messaging.exceptions.Empty:
+ pass
+
+ except Exception, e:
+ if isOpen:
+ self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerDisconnected broker=%s" % self.url)
+ isOpen = False
+ sleep(1)
+
class JHelpFormatter(IndentedHelpFormatter):
- """Format usage and description without stripping newlines from usage strings
"""
-
+ Format usage and description without stripping newlines from usage strings
+ """
def format_usage(self, usage):
return usage
@@ -87,16 +138,23 @@ def main(argv=None):
if len(arguments) == 0:
arguments.append("localhost")
- console = EventConsole()
- session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True)
- brokers = []
+ brokers = []
+ mechanism = options.sasl_mechanism
+ props = {'qpid.ha-admin' : 1}
+ printer = Printer()
+
+ if options.heartbeats:
+ props['heartbeat'] = 5
+
try:
try:
for host in arguments:
- brokers.append(session.addBroker(host, None, options.sasl_mechanism))
+ er = EventReceiver(printer, host, mechanism, props)
+ brokers.append(er)
+ er.start()
- while (True):
- sleep(10)
+ while (True):
+ sleep(10)
except KeyboardInterrupt:
print
@@ -106,9 +164,10 @@ def main(argv=None):
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
finally:
- while len(brokers):
- b = brokers.pop()
- session.delBroker(b)
+ for b in brokers:
+ b.cancel()
+ for b in brokers:
+ b.join()
if __name__ == '__main__':
sys.exit(main())
diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py
index 0bae786306..840db05795 100644
--- a/qpid/tools/src/py/qpidtoollibs/broker.py
+++ b/qpid/tools/src/py/qpidtoollibs/broker.py
@@ -18,6 +18,7 @@
#
from qpid.messaging import Message
+from qpidtoollibs.disp import TimeLong
try:
from uuid import uuid4
except ImportError:
@@ -295,6 +296,41 @@ class BrokerAgent(object):
return self._getBrokerObject(self, _type, oid)
+class EventHelper(object):
+ def eventAddress(self, pkg='*', cls='*', sev='*'):
+ return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev)
+
+ def event(self, msg):
+ return BrokerEvent(msg)
+
+
+class BrokerEvent(object):
+ def __init__(self, msg):
+ self.msg = msg
+ self.content = msg.content[0]
+ self.values = self.content['_values']
+ self.schema_id = self.content['_schema_id']
+ self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name'])
+
+ def __repr__(self):
+ rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name)
+ for k,v in self.values.items():
+ rep = rep + " %s=%s" % (k, v)
+ return rep
+
+ def __getattr__(self, key):
+ if key not in self.values:
+ return None
+ value = self.values[key]
+ return value
+
+ def getAttributes(self):
+ return self.values
+
+ def getTimestamp(self):
+ return self.content['_timestamp']
+
+
class BrokerObject(object):
def __init__(self, broker, content):
self.broker = broker
@@ -362,7 +398,7 @@ class Connection(BrokerObject):
BrokerObject.__init__(self, broker, values)
def close(self):
- pass
+ self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address)
class Session(BrokerObject):
def __init__(self, broker, values):