summaryrefslogtreecommitdiff
path: root/python/commands/qpid-route
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-09-19 20:54:08 +0000
committerTed Ross <tross@apache.org>2008-09-19 20:54:08 +0000
commit723bff9590332fee6b4ecacb08dc762613994128 (patch)
tree6063112085022d53cde7db630078f2ba8a2c0a49 /python/commands/qpid-route
parent88e386e9130caecb9a095dd0362dbeff89828adc (diff)
downloadqpid-python-723bff9590332fee6b4ecacb08dc762613994128.tar.gz
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands/qpid-route')
-rwxr-xr-xpython/commands/qpid-route266
1 files changed, 101 insertions, 165 deletions
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index 3cd9109a6a..172927b72a 100755
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -22,13 +22,8 @@
import getopt
import sys
import socket
-import qpid
import os
-from qpid.management import managementClient
-from qpid.managementdata import Broker
-from qpid.peer import Closed
-from qpid.connection import Connection, ConnectionFailed
-from qpid.util import connect
+from qpid import qmfconsole
def Usage ():
print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
@@ -58,93 +53,57 @@ _dellink = False
class RouteManager:
def __init__ (self, destBroker):
- self.dest = Broker (destBroker)
+ self.dest = qmfconsole.BrokerURL(destBroker)
self.src = None
-
- def ConnectToBroker (self):
- broker = self.dest
- if _verbose:
- print "Connecting to broker: %s:%d" % (broker.host, broker.port)
- try:
- self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self.conn = Connection (connect (broker.host, broker.port), \
- username=broker.username, password=broker.password)
- self.conn.start ()
- self.session = self.conn.session(self.sessionId)
- self.mclient = managementClient (self.conn.spec)
- self.mch = self.mclient.addChannel (self.session)
- self.mclient.syncWaitForStable (self.mch)
- except socket.error, e:
- print "Socket Error %s - %s" % (e[0], e[1])
- sys.exit (1)
- except Closed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit (1)
- except ConnectionFailed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit(1)
+ self.qmf = qmfconsole.Session()
+ self.broker = self.qmf.addBroker(destBroker)
def Disconnect (self):
- self.mclient.removeChannel (self.mch)
- self.session.close(timeout=10)
- self.conn.close(timeout=10)
+ self.qmf.delBroker(self.broker)
def getLink (self):
- links = self.mclient.syncGetObjects (self.mch, "link")
+ links = self.qmf.getObjects(name="link")
for link in links:
if "%s:%d" % (link.host, link.port) == self.src.name ():
return link
return None
def AddLink (self, srcBroker):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
+ self.src = qmfconsole.BrokerURL(srcBroker)
if self.dest.name() == self.src.name():
print "Linking broker to itself is not permitted"
sys.exit(1)
- brokers = mc.syncGetObjects (self.mch, "broker")
+ brokers = self.qmf.getObjects(name="broker")
broker = brokers[0]
link = self.getLink()
if link != None:
- print "Link already exists"
- sys.exit(1)
+ raise Exception("Link already exists")
- connectArgs = {}
- connectArgs["host"] = self.src.host
- connectArgs["port"] = self.src.port
- connectArgs["useSsl"] = False
- connectArgs["durable"] = _durable
- if self.src.username == "anonymous":
- connectArgs["authMechanism"] = "ANONYMOUS"
+ if self.src.authName == "anonymous":
+ mech = "ANONYMOUS"
else:
- connectArgs["authMechanism"] = "PLAIN"
- connectArgs["username"] = self.src.username
- connectArgs["password"] = self.src.password
- res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+ mech = "PLAIN"
+ res = broker.connect(self.src.host, self.src.port, False, _durable,
+ mech, self.src.authName, self.src.authPass)
if _verbose:
- print "Connect method returned:", res.status, res.statusText
- link = self.getLink ()
+ print "Connect method returned:", res.status, res.text
+ link = self.getLink()
def DelLink (self, srcBroker):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
- brokers = mc.syncGetObjects (self.mch, "broker")
+ self.src = qmfconsole.BrokerURL(srcBroker)
+ brokers = self.qmf.getObjects(name="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
- print "Link not found"
- sys.exit(1)
+ raise Exception("Link not found")
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ res = link.close()
if _verbose:
- print "Close method returned:", res.status, res.statusText
+ print "Close method returned:", res.status, res.text
def ListLinks (self):
- mc = self.mclient
- links = mc.syncGetObjects (self.mch, "link")
+ links = self.qmf.getObjects(name="link")
if len(links) == 0:
print "No Links Found"
else:
@@ -152,145 +111,118 @@ class RouteManager:
print "Host Port Durable State Last Error"
print "==================================================================="
for link in links:
- print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError)
+ print "%-16s%-8d %c %-18s%s" % \
+ (link.host, link.port, YN(link.durable), link.state, link.lastError)
def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
+ self.src = qmfconsole.BrokerURL(srcBroker)
if self.dest.name() == self.src.name():
- print "Linking broker to itself is not permitted"
- sys.exit(1)
+ raise Exception("Linking broker to itself is not permitted")
- brokers = mc.syncGetObjects (self.mch, "broker")
+ brokers = self.qmf.getObjects(name="broker")
broker = brokers[0]
- link = self.getLink ()
+ link = self.getLink()
if link == None:
if _verbose:
print "Inter-broker link not found, creating..."
- connectArgs = {}
- connectArgs["host"] = self.src.host
- connectArgs["port"] = self.src.port
- connectArgs["useSsl"] = False
- connectArgs["durable"] = _durable
- if self.src.username == "anonymous":
- connectArgs["authMechanism"] = "ANONYMOUS"
+ if self.src.authName == "anonymous":
+ mech = "ANONYMOUS"
else:
- connectArgs["authMechanism"] = "PLAIN"
- connectArgs["username"] = self.src.username
- connectArgs["password"] = self.src.password
- res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+ mech = "PLAIN"
+ res = broker.connect(self.src.host, self.src.port, False, _durable,
+ mech, self.src.authName, self.src.authPass)
if _verbose:
- print "Connect method returned:", res.status, res.statusText
- link = self.getLink ()
+ print "Connect method returned:", res.status, res.text
+ link = self.getLink()
if link == None:
- print "Protocol Error - Missing link ID"
- sys.exit (1)
+ raise Exception("Protocol Error - Missing link ID")
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
- if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.key == routingKey:
if not _quiet:
- print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)
- sys.exit (1)
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
sys.exit (0)
if _verbose:
print "Creating inter-broker binding..."
- bridgeArgs = {}
- bridgeArgs["durable"] = _durable
- bridgeArgs["src"] = exchange
- bridgeArgs["dest"] = exchange
- bridgeArgs["key"] = routingKey
- bridgeArgs["tag"] = tag
- bridgeArgs["excludes"] = excludes
- bridgeArgs["srcIsQueue"] = 0
- bridgeArgs["srcIsLocal"] = 0
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs)
+ res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0)
if res.status == 4:
- print "Can't create a durable route on a non-durable link"
- sys.exit(1)
+ raise Exception("Can't create a durable route on a non-durable link")
if _verbose:
print "Bridge method returned:", res.status, res.statusText
def DelRoute (self, srcBroker, exchange, routingKey):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
- link = self.getLink ()
+ self.src = qmfconsole.BrokerURL(srcBroker)
+ link = self.getLink()
if link == None:
if not _quiet:
- print "No link found from %s to %s" % (self.src.name(), self.dest.name())
- sys.exit (1)
+ raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
sys.exit (0)
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
- if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+ if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
if _verbose:
print "Closing bridge..."
- res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+ res = bridge.close()
if res.status != 0:
- print "Error closing bridge: %d - %s" % (res.status, res.statusText)
- sys.exit (1)
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.statusText))
if len (bridges) == 1 and _dellink:
link = self.getLink ()
if link == None:
sys.exit (0)
if _verbose:
print "Last bridge on link, closing link..."
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ res = link.close()
if res.status != 0:
- print "Error closing link: %d - %s" % (res.status, res.statusText)
- sys.exit (1)
+ raise Exception("Error closing link: %d - %s" % (res.status, res.statusText))
sys.exit (0)
if not _quiet:
- print "Route not found"
- sys.exit (1)
+ raise Exception("Route not found")
def ListRoutes (self):
- mc = self.mclient
- links = mc.syncGetObjects (self.mch, "link")
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ links = self.qmf.getObjects(name="link")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
myLink = None
for link in links:
- if bridge.linkRef == link.id:
+ if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
def ClearAllRoutes (self):
- mc = self.mclient
- links = mc.syncGetObjects (self.mch, "link")
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ links = self.qmf.getObjects(name="link")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
if _verbose:
myLink = None
for link in links:
- if bridge.linkRef == link.id:
+ if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
- res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+ res = bridge.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.statusText)
elif _verbose:
print "Ok"
if _dellink:
- links = mc.syncGetObjects (self.mch, "link")
+ links = self.qmf.getObjects(name="link")
for link in links:
if _verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ res = link.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.statusText)
elif _verbose:
@@ -331,41 +263,45 @@ else:
group = cargs[0]
cmd = cargs[1]
-rm = RouteManager (destBroker)
-rm.ConnectToBroker ()
-
-if group == "link":
- if cmd == "add":
- if nargs != 4:
- Usage()
- rm.AddLink (cargs[3])
- elif cmd == "del":
- if nargs != 4:
- Usage()
- rm.DelLink (cargs[3])
- elif cmd == "list":
- rm.ListLinks ()
-
-elif group == "route":
- if cmd == "add":
- if nargs < 6 or nargs > 8:
- Usage ()
-
- tag = ""
- excludes = ""
- if nargs > 6: tag = cargs[6]
- if nargs > 7: excludes = cargs[7]
- rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
- elif cmd == "del":
- if nargs != 6:
- Usage ()
- else:
- rm.DelRoute (cargs[3], cargs[4], cargs[5])
- else:
- if cmd == "list":
- rm.ListRoutes ()
- elif cmd == "flush":
- rm.ClearAllRoutes ()
+
+try:
+ rm = RouteManager (destBroker)
+ if group == "link":
+ if cmd == "add":
+ if nargs != 4:
+ Usage()
+ rm.AddLink (cargs[3])
+ elif cmd == "del":
+ if nargs != 4:
+ Usage()
+ rm.DelLink (cargs[3])
+ elif cmd == "list":
+ rm.ListLinks ()
+
+ elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 8:
+ Usage ()
+
+ tag = ""
+ excludes = ""
+ if nargs > 6: tag = cargs[6]
+ if nargs > 7: excludes = cargs[7]
+ rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
+ elif cmd == "del":
+ if nargs != 6:
+ Usage ()
+ else:
+ rm.DelRoute (cargs[3], cargs[4], cargs[5])
else:
- Usage ()
+ if cmd == "list":
+ rm.ListRoutes ()
+ elif cmd == "flush":
+ rm.ClearAllRoutes ()
+ else:
+ Usage ()
+except Exception,e:
+ print "Failed:", e.message
+ sys.exit(1)
+
rm.Disconnect ()