diff options
| author | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
| commit | 723bff9590332fee6b4ecacb08dc762613994128 (patch) | |
| tree | 6063112085022d53cde7db630078f2ba8a2c0a49 /python/commands/qpid-route | |
| parent | 88e386e9130caecb9a095dd0362dbeff89828adc (diff) | |
| download | qpid-python-723bff9590332fee6b4ecacb08dc762613994128.tar.gz | |
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands/qpid-route')
| -rwxr-xr-x | python/commands/qpid-route | 266 |
1 files changed, 101 insertions, 165 deletions
diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 3cd9109a6a..172927b72a 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -22,13 +22,8 @@ import getopt import sys import socket -import qpid import os -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.util import connect +from qpid import qmfconsole def Usage (): print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>" @@ -58,93 +53,57 @@ _dellink = False class RouteManager: def __init__ (self, destBroker): - self.dest = Broker (destBroker) + self.dest = qmfconsole.BrokerURL(destBroker) self.src = None - - def ConnectToBroker (self): - broker = self.dest - if _verbose: - print "Connecting to broker: %s:%d" % (broker.host, broker.port) - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (broker.host, broker.port), \ - username=broker.username, password=broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mch = self.mclient.addChannel (self.session) - self.mclient.syncWaitForStable (self.mch) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) + self.qmf = qmfconsole.Session() + self.broker = self.qmf.addBroker(destBroker) def Disconnect (self): - self.mclient.removeChannel (self.mch) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.qmf.delBroker(self.broker) def getLink (self): - links = self.mclient.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") for link in links: if "%s:%d" % (link.host, link.port) == self.src.name (): return link return None def AddLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - + self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.name() == self.src.name(): print "Linking broker to itself is not permitted" sys.exit(1) - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] link = self.getLink() if link != None: - print "Link already exists" - sys.exit(1) + raise Exception("Link already exists") - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" + if self.src.authName == "anonymous": + mech = "ANONYMOUS" else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) + mech = "PLAIN" + res = broker.connect(self.src.host, self.src.port, False, _durable, + mech, self.src.authName, self.src.authPass) if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + print "Connect method returned:", res.status, res.text + link = self.getLink() def DelLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - - brokers = mc.syncGetObjects (self.mch, "broker") + self.src = qmfconsole.BrokerURL(srcBroker) + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] link = self.getLink() if link == None: - print "Link not found" - sys.exit(1) + raise Exception("Link not found") - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if _verbose: - print "Close method returned:", res.status, res.statusText + print "Close method returned:", res.status, res.text def ListLinks (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") if len(links) == 0: print "No Links Found" else: @@ -152,145 +111,118 @@ class RouteManager: print "Host Port Durable State Last Error" print "===================================================================" for link in links: - print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError) + print "%-16s%-8d %c %-18s%s" % \ + (link.host, link.port, YN(link.durable), link.state, link.lastError) def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes): - self.src = Broker (srcBroker) - mc = self.mclient - + self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.name() == self.src.name(): - print "Linking broker to itself is not permitted" - sys.exit(1) + raise Exception("Linking broker to itself is not permitted") - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] - link = self.getLink () + link = self.getLink() if link == None: if _verbose: print "Inter-broker link not found, creating..." - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" + if self.src.authName == "anonymous": + mech = "ANONYMOUS" else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) + mech = "PLAIN" + res = broker.connect(self.src.host, self.src.port, False, _durable, + mech, self.src.authName, self.src.authPass) if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + print "Connect method returned:", res.status, res.text + link = self.getLink() if link == None: - print "Protocol Error - Missing link ID" - sys.exit (1) + raise Exception("Protocol Error - Missing link ID") - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey: if not _quiet: - print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey) - sys.exit (1) + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) sys.exit (0) if _verbose: print "Creating inter-broker binding..." - bridgeArgs = {} - bridgeArgs["durable"] = _durable - bridgeArgs["src"] = exchange - bridgeArgs["dest"] = exchange - bridgeArgs["key"] = routingKey - bridgeArgs["tag"] = tag - bridgeArgs["excludes"] = excludes - bridgeArgs["srcIsQueue"] = 0 - bridgeArgs["srcIsLocal"] = 0 - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0) if res.status == 4: - print "Can't create a durable route on a non-durable link" - sys.exit(1) + raise Exception("Can't create a durable route on a non-durable link") if _verbose: print "Bridge method returned:", res.status, res.statusText def DelRoute (self, srcBroker, exchange, routingKey): - self.src = Broker (srcBroker) - mc = self.mclient - - link = self.getLink () + self.src = qmfconsole.BrokerURL(srcBroker) + link = self.getLink() if link == None: if not _quiet: - print "No link found from %s to %s" % (self.src.name(), self.dest.name()) - sys.exit (1) + raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name())) sys.exit (0) - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey: if _verbose: print "Closing bridge..." - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: - print "Error closing bridge: %d - %s" % (res.status, res.statusText) - sys.exit (1) + raise Exception("Error closing bridge: %d - %s" % (res.status, res.statusText)) if len (bridges) == 1 and _dellink: link = self.getLink () if link == None: sys.exit (0) if _verbose: print "Last bridge on link, closing link..." - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: - print "Error closing link: %d - %s" % (res.status, res.statusText) - sys.exit (1) + raise Exception("Error closing link: %d - %s" % (res.status, res.statusText)) sys.exit (0) if not _quiet: - print "Route not found" - sys.exit (1) + raise Exception("Route not found") def ListRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + links = self.qmf.getObjects(name="link") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) def ClearAllRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + links = self.qmf.getObjects(name="link") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: if _verbose: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: print "Ok" if _dellink: - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: @@ -331,41 +263,45 @@ else: group = cargs[0] cmd = cargs[1] -rm = RouteManager (destBroker) -rm.ConnectToBroker () - -if group == "link": - if cmd == "add": - if nargs != 4: - Usage() - rm.AddLink (cargs[3]) - elif cmd == "del": - if nargs != 4: - Usage() - rm.DelLink (cargs[3]) - elif cmd == "list": - rm.ListLinks () - -elif group == "route": - if cmd == "add": - if nargs < 6 or nargs > 8: - Usage () - - tag = "" - excludes = "" - if nargs > 6: tag = cargs[6] - if nargs > 7: excludes = cargs[7] - rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) - elif cmd == "del": - if nargs != 6: - Usage () - else: - rm.DelRoute (cargs[3], cargs[4], cargs[5]) - else: - if cmd == "list": - rm.ListRoutes () - elif cmd == "flush": - rm.ClearAllRoutes () + +try: + rm = RouteManager (destBroker) + if group == "link": + if cmd == "add": + if nargs != 4: + Usage() + rm.AddLink (cargs[3]) + elif cmd == "del": + if nargs != 4: + Usage() + rm.DelLink (cargs[3]) + elif cmd == "list": + rm.ListLinks () + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 8: + Usage () + + tag = "" + excludes = "" + if nargs > 6: tag = cargs[6] + if nargs > 7: excludes = cargs[7] + rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) + elif cmd == "del": + if nargs != 6: + Usage () + else: + rm.DelRoute (cargs[3], cargs[4], cargs[5]) else: - Usage () + if cmd == "list": + rm.ListRoutes () + elif cmd == "flush": + rm.ClearAllRoutes () + else: + Usage () +except Exception,e: + print "Failed:", e.message + sys.exit(1) + rm.Disconnect () |
