summaryrefslogtreecommitdiff
path: root/python/commands
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-04-08 21:51:17 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-04-08 21:51:17 +0000
commit46442dcd6582b39d57ddf3388921f87b0c533ce7 (patch)
treeea284161d3d25b89ef11013e858ae603e8806449 /python/commands
parentae65a6dc955b727f6b092d50831467d564a00c37 (diff)
downloadqpid-python-46442dcd6582b39d57ddf3388921f87b0c533ce7.tar.gz
QPID-908 from tross
+ corrected spec location -s. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@646093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands')
-rw-r--r--python/commands/qpid-route266
1 files changed, 266 insertions, 0 deletions
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
new file mode 100644
index 0000000000..3f29760ef5
--- /dev/null
+++ b/python/commands/qpid-route
@@ -0,0 +1,266 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import getopt
+import sys
+import socket
+import qpid
+from qpid.management import managementClient
+from qpid.peer import Closed
+from qpid.client import Client
+
+def Usage ():
+ print "Usage: qpid-route [OPTIONS] add <dest-broker> <src-broker> <exchange> <routing-key>"
+ print " qpid-route [OPTIONS] del <dest-broker> <src-broker> <exchange> <routing-key>"
+ print " qpid-route [OPTIONS] list <dest-broker>"
+ #print " qpid-route [OPTIONS] load <filename>"
+ print " qpid-route [OPTIONS] flush <dest-broker>"
+ print
+ print "Options:"
+ print " -s [ --spec-file ] PATH (/usr/share/amqp/amqp.0-10-preview.xml)"
+ print " -v [ --verbose ] Verbose output"
+ print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
+ print
+ print " dest-broker and src-broker are in the form: hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000"
+ print
+ #print " If loading the route configuration from a file, the input file has one line per route"
+ #print " in the form:"
+ #print
+ #print " <dest-broker> <src-broker> <exchange> <routing-key>"
+ #print
+ sys.exit (1)
+
+_specpath = "/usr/share/amqp/amqp.0-10-preview.xml"
+_verbose = False
+_quiet = False
+
+class Broker:
+ def __init__ (self, text):
+ colon = text.find (":")
+ if colon == -1:
+ host = text
+ self.port = 5672
+ else:
+ host = text[:colon]
+ self.port = int (text[colon+1:])
+ self.host = socket.gethostbyname (host)
+
+ def name (self):
+ return self.host + ":" + str (self.port)
+
+class RouteManager:
+ def __init__ (self, destBroker):
+ self.dest = Broker (destBroker)
+ self.src = None
+
+ def ConnectToBroker (self):
+ broker = self.dest
+ if _verbose:
+ print "Connecting to broker: %s:%d" % (broker.host, broker.port)
+ try:
+ self.spec = qpid.spec.load (_specpath)
+ self.client = Client (broker.host, broker.port, self.spec)
+ self.client.start ({"LOGIN":"guest","PASSWORD":"guest"})
+ self.channel = self.client.channel (1)
+ self.mclient = managementClient (self.spec)
+ self.mch = self.mclient.addChannel (self.channel)
+ self.mclient.syncWaitForStable (self.mch)
+ except socket.error, e:
+ print "Connect Error:", e
+ exit (1)
+
+ def getLink (self):
+ links = self.mclient.syncGetObjects (self.mch, "link")
+ for link in links:
+ if link.address == self.src.name ():
+ return link
+ return None
+
+ def AddRoute (self, srcBroker, exchange, routingKey):
+ self.src = Broker (srcBroker)
+ mc = self.mclient
+
+ brokers = mc.syncGetObjects (self.mch, "broker")
+ broker = brokers[0]
+
+ link = self.getLink ()
+ if link == None:
+ if _verbose:
+ print "Inter-broker link not found, creating..."
+
+ connectArgs = {}
+ connectArgs["host"] = self.src.host
+ connectArgs["port"] = self.src.port
+ res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+ if _verbose:
+ print "Connect method returned:", res.status, res.statusText
+ link = self.getLink ()
+
+ if link == None:
+ print "Protocol Error - Missing link ID"
+ exit (1)
+
+ bridges = mc.syncGetObjects (self.mch, "bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+ if not _quiet:
+ print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)
+ exit (1)
+ exit (0)
+
+ if _verbose:
+ print "Creating inter-broker binding..."
+ bridgeArgs = {}
+ bridgeArgs["src"] = "src"
+ bridgeArgs["dest"] = exchange
+ bridgeArgs["key"] = routingKey
+ bridgeArgs["src_is_queue"] = 0
+ bridgeArgs["src_is_local"] = 0
+ res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs)
+ if _verbose:
+ print "Bridge method returned:", res.status, res.statusText
+
+ def DelRoute (self, srcBroker, exchange, routingKey):
+ self.src = Broker (srcBroker)
+ mc = self.mclient
+
+ link = self.getLink ()
+ if link == None:
+ if not _quiet:
+ print "No link found from %s to %s" % (self.src.name(), self.dest.name())
+ exit (1)
+ exit (0)
+
+ bridges = mc.syncGetObjects (self.mch, "bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+ if _verbose:
+ print "Closing bridge..."
+ res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+ if res.status != 0:
+ print "Error closing bridge: %d - %s" % (res.status, res.statusText)
+ exit (1)
+ if len (bridges) == 1:
+ if _verbose:
+ print "Last bridge on link, closing link..."
+ res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ if res.status != 0:
+ print "Error closing link: %d - %s" % (res.status, res.statusText)
+ exit (1)
+ exit (0)
+ if not _quiet:
+ print "Route not found"
+ exit (1)
+
+ def ListRoutes (self):
+ mc = self.mclient
+ links = mc.syncGetObjects (self.mch, "link")
+ bridges = mc.syncGetObjects (self.mch, "bridge")
+
+ for bridge in bridges:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.id:
+ myLink = link
+ break
+ if myLink != None:
+ print "%s %s %s %s" % (self.dest.name(), myLink.address, bridge.dest, bridge.key)
+
+ def LoadRoutes (self, inFile):
+ pass
+
+ def ClearAllRoutes (self):
+ mc = self.mclient
+ links = mc.syncGetObjects (self.mch, "link")
+ bridges = mc.syncGetObjects (self.mch, "bridge")
+
+ for bridge in bridges:
+ if _verbose:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.id:
+ myLink = link
+ break
+ if myLink != None:
+ print "Deleting Bridge: %s %s %s... " % (myLink.address, bridge.dest, bridge.key),
+ res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.statusText)
+ elif _verbose:
+ print "Ok"
+
+ for link in links:
+ if _verbose:
+ print "Deleting Link: %s... " % link.address,
+ res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.statusText)
+ elif _verbose:
+ print "Ok"
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("verbose", "quiet", "spec-file=")
+ (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "s:vq", longOpts)
+except:
+ Usage ()
+
+for opt in optlist:
+ if opt[0] == "-s" or opt[0] == "--spec-file":
+ _specpath = opt[1]
+ if opt[0] == "-v" or opt[0] == "--verbose":
+ _verbose = True
+ if opt[0] == "-q" or opt[0] == "--quiet":
+ _quiet = True
+
+nargs = len (cargs)
+if nargs < 2:
+ Usage ()
+
+cmd = cargs[0]
+if cmd != "load":
+ rm = RouteManager (cargs[1])
+ rm.ConnectToBroker ()
+
+if cmd == "add" or cmd == "del":
+ if nargs != 5:
+ Usage ()
+ if cmd == "add":
+ rm.AddRoute (cargs[2], cargs[3], cargs[4])
+ else:
+ rm.DelRoute (cargs[2], cargs[3], cargs[4])
+else:
+ if nargs != 2:
+ Usage ()
+
+ if cmd == "list":
+ rm.ListRoutes ()
+ #elif cmd == "load":
+ # rm.LoadRoutes (cargs[1])
+ elif cmd == "flush":
+ rm.ClearAllRoutes ()
+ else:
+ Usage ()
+