diff options
| author | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
| commit | 723bff9590332fee6b4ecacb08dc762613994128 (patch) | |
| tree | 6063112085022d53cde7db630078f2ba8a2c0a49 /python/commands | |
| parent | 88e386e9130caecb9a095dd0362dbeff89828adc (diff) | |
| download | qpid-python-723bff9590332fee6b4ecacb08dc762613994128.tar.gz | |
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands')
| -rwxr-xr-x | python/commands/qpid-config | 220 | ||||
| -rwxr-xr-x | python/commands/qpid-route | 266 |
2 files changed, 175 insertions, 311 deletions
diff --git a/python/commands/qpid-config b/python/commands/qpid-config index cc9315f7ea..6bc38c7440 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -22,16 +22,7 @@ import os import getopt import sys -import socket -import qpid -from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import uuid4 -from qpid.util import connect -from time import sleep +from qpid import qmfconsole _recursive = False _host = "localhost" @@ -78,44 +69,21 @@ def Usage (): class BrokerManager: def __init__ (self): - self.dest = None - self.src = None - self.broker = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session (self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker (self, brokerUrl): + self.url = brokerUrl + self.qmf = qmfconsole.Session() + self.broker = self.qmf.addBroker(brokerUrl) + + def Disconnect(self): + self.qmf.delBroker(self.broker) def Overview (self): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + queues = self.qmf.getObjects(name="queue") print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: @@ -136,11 +104,7 @@ class BrokerManager: print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") + exchanges = self.qmf.getObjects(name="exchange") print "Durable Type Bindings Exchange Name" print "=======================================================" for ex in exchanges: @@ -148,18 +112,14 @@ class BrokerManager: print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) def ExchangeListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + bindings = self.qmf.getObjects(name="binding") + queues = self.qmf.getObjects(name="queue") for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.id: + if bind.exchangeRef == ex.getObjectId(): qname = "<unknown>" queue = self.findById (queues, bind.queueRef) if queue != None: @@ -168,12 +128,8 @@ class BrokerManager: def QueueList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - queues = mc.syncGetObjects (mch, "queue") - journals = mc.syncGetObjects (mch, "journal") + queues = self.qmf.getObjects(name="queue") + journals = self.qmf.getObjects(name="journal") print " Store Size" print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" print "===========================================================================================" @@ -193,18 +149,14 @@ class BrokerManager: YN (q.exclusive), q.bindingCount, q.name) def QueueListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + bindings = self.qmf.getObjects(name="binding") + queues = self.qmf.getObjects(name="queue") for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.id: + if bind.queueRef == queue.getObjectId(): ename = "<unknown>" ex = self.findById (exchanges, bind.exchangeRef) if ex != None: @@ -216,30 +168,19 @@ class BrokerManager: def AddExchange (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () etype = args[0] ename = args[1] - - try: - self.session.exchange_declare (exchange=ename, type=etype, durable=_durable) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable) def DelExchange (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () ename = args[0] - - try: - self.session.exchange_delete (exchange=ename) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_delete (exchange=ename) def AddQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] declArgs = {} if _durable: @@ -251,55 +192,37 @@ class BrokerManager: if _maxQueueCount: declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - try: - self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs) def DelQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] - - try: - self.session.queue_delete (queue=qname) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_delete (queue=qname) def Bind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) def Unbind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) def findById (self, items, id): for item in items: - if item.id == id: + if item.getObjectId() == id: return item return None @@ -343,43 +266,48 @@ for opt in optlist: nargs = len (cargs) bm = BrokerManager () -bm.SetBroker (Broker (_host)) - -if nargs == 0: - bm.Overview () -else: - cmd = cargs[0] - modifier = "" - if nargs > 1: - modifier = cargs[1] - if cmd[0] == 'e': - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd[0] == 'q': - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (cargs[2:]) - elif modifier == "queue": - bm.AddQueue (cargs[2:]) - else: - Usage () - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (cargs[2:]) - elif modifier == "queue": - bm.DelQueue (cargs[2:]) + +try: + bm.SetBroker(qmfconsole.BrokerURL(_host)) + if nargs == 0: + bm.Overview () + else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd[0] == 'e': + if _recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd[0] == 'q': + if _recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) else: Usage () - elif cmd == "bind": - bm.Bind (cargs[1:]) - elif cmd == "unbind": - bm.Unbind (cargs[1:]) - else: - Usage () +except Exception,e: + print "Failed:", e.message + sys.exit(1) + bm.Disconnect() diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 3cd9109a6a..172927b72a 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -22,13 +22,8 @@ import getopt import sys import socket -import qpid import os -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.util import connect +from qpid import qmfconsole def Usage (): print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>" @@ -58,93 +53,57 @@ _dellink = False class RouteManager: def __init__ (self, destBroker): - self.dest = Broker (destBroker) + self.dest = qmfconsole.BrokerURL(destBroker) self.src = None - - def ConnectToBroker (self): - broker = self.dest - if _verbose: - print "Connecting to broker: %s:%d" % (broker.host, broker.port) - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (broker.host, broker.port), \ - username=broker.username, password=broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mch = self.mclient.addChannel (self.session) - self.mclient.syncWaitForStable (self.mch) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) + self.qmf = qmfconsole.Session() + self.broker = self.qmf.addBroker(destBroker) def Disconnect (self): - self.mclient.removeChannel (self.mch) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.qmf.delBroker(self.broker) def getLink (self): - links = self.mclient.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") for link in links: if "%s:%d" % (link.host, link.port) == self.src.name (): return link return None def AddLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - + self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.name() == self.src.name(): print "Linking broker to itself is not permitted" sys.exit(1) - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] link = self.getLink() if link != None: - print "Link already exists" - sys.exit(1) + raise Exception("Link already exists") - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" + if self.src.authName == "anonymous": + mech = "ANONYMOUS" else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) + mech = "PLAIN" + res = broker.connect(self.src.host, self.src.port, False, _durable, + mech, self.src.authName, self.src.authPass) if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + print "Connect method returned:", res.status, res.text + link = self.getLink() def DelLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - - brokers = mc.syncGetObjects (self.mch, "broker") + self.src = qmfconsole.BrokerURL(srcBroker) + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] link = self.getLink() if link == None: - print "Link not found" - sys.exit(1) + raise Exception("Link not found") - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if _verbose: - print "Close method returned:", res.status, res.statusText + print "Close method returned:", res.status, res.text def ListLinks (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") if len(links) == 0: print "No Links Found" else: @@ -152,145 +111,118 @@ class RouteManager: print "Host Port Durable State Last Error" print "===================================================================" for link in links: - print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError) + print "%-16s%-8d %c %-18s%s" % \ + (link.host, link.port, YN(link.durable), link.state, link.lastError) def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes): - self.src = Broker (srcBroker) - mc = self.mclient - + self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.name() == self.src.name(): - print "Linking broker to itself is not permitted" - sys.exit(1) + raise Exception("Linking broker to itself is not permitted") - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] - link = self.getLink () + link = self.getLink() if link == None: if _verbose: print "Inter-broker link not found, creating..." - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" + if self.src.authName == "anonymous": + mech = "ANONYMOUS" else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) + mech = "PLAIN" + res = broker.connect(self.src.host, self.src.port, False, _durable, + mech, self.src.authName, self.src.authPass) if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + print "Connect method returned:", res.status, res.text + link = self.getLink() if link == None: - print "Protocol Error - Missing link ID" - sys.exit (1) + raise Exception("Protocol Error - Missing link ID") - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey: if not _quiet: - print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey) - sys.exit (1) + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) sys.exit (0) if _verbose: print "Creating inter-broker binding..." - bridgeArgs = {} - bridgeArgs["durable"] = _durable - bridgeArgs["src"] = exchange - bridgeArgs["dest"] = exchange - bridgeArgs["key"] = routingKey - bridgeArgs["tag"] = tag - bridgeArgs["excludes"] = excludes - bridgeArgs["srcIsQueue"] = 0 - bridgeArgs["srcIsLocal"] = 0 - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0) if res.status == 4: - print "Can't create a durable route on a non-durable link" - sys.exit(1) + raise Exception("Can't create a durable route on a non-durable link") if _verbose: print "Bridge method returned:", res.status, res.statusText def DelRoute (self, srcBroker, exchange, routingKey): - self.src = Broker (srcBroker) - mc = self.mclient - - link = self.getLink () + self.src = qmfconsole.BrokerURL(srcBroker) + link = self.getLink() if link == None: if not _quiet: - print "No link found from %s to %s" % (self.src.name(), self.dest.name()) - sys.exit (1) + raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name())) sys.exit (0) - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey: if _verbose: print "Closing bridge..." - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: - print "Error closing bridge: %d - %s" % (res.status, res.statusText) - sys.exit (1) + raise Exception("Error closing bridge: %d - %s" % (res.status, res.statusText)) if len (bridges) == 1 and _dellink: link = self.getLink () if link == None: sys.exit (0) if _verbose: print "Last bridge on link, closing link..." - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: - print "Error closing link: %d - %s" % (res.status, res.statusText) - sys.exit (1) + raise Exception("Error closing link: %d - %s" % (res.status, res.statusText)) sys.exit (0) if not _quiet: - print "Route not found" - sys.exit (1) + raise Exception("Route not found") def ListRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + links = self.qmf.getObjects(name="link") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) def ClearAllRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + links = self.qmf.getObjects(name="link") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: if _verbose: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: print "Ok" if _dellink: - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: @@ -331,41 +263,45 @@ else: group = cargs[0] cmd = cargs[1] -rm = RouteManager (destBroker) -rm.ConnectToBroker () - -if group == "link": - if cmd == "add": - if nargs != 4: - Usage() - rm.AddLink (cargs[3]) - elif cmd == "del": - if nargs != 4: - Usage() - rm.DelLink (cargs[3]) - elif cmd == "list": - rm.ListLinks () - -elif group == "route": - if cmd == "add": - if nargs < 6 or nargs > 8: - Usage () - - tag = "" - excludes = "" - if nargs > 6: tag = cargs[6] - if nargs > 7: excludes = cargs[7] - rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) - elif cmd == "del": - if nargs != 6: - Usage () - else: - rm.DelRoute (cargs[3], cargs[4], cargs[5]) - else: - if cmd == "list": - rm.ListRoutes () - elif cmd == "flush": - rm.ClearAllRoutes () + +try: + rm = RouteManager (destBroker) + if group == "link": + if cmd == "add": + if nargs != 4: + Usage() + rm.AddLink (cargs[3]) + elif cmd == "del": + if nargs != 4: + Usage() + rm.DelLink (cargs[3]) + elif cmd == "list": + rm.ListLinks () + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 8: + Usage () + + tag = "" + excludes = "" + if nargs > 6: tag = cargs[6] + if nargs > 7: excludes = cargs[7] + rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) + elif cmd == "del": + if nargs != 6: + Usage () + else: + rm.DelRoute (cargs[3], cargs[4], cargs[5]) else: - Usage () + if cmd == "list": + rm.ListRoutes () + elif cmd == "flush": + rm.ClearAllRoutes () + else: + Usage () +except Exception,e: + print "Failed:", e.message + sys.exit(1) + rm.Disconnect () |
