summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-06-26 12:57:43 +0000
committerTed Ross <tross@apache.org>2009-06-26 12:57:43 +0000
commit4072ab6f9e4d5eafb3ffa25cb8538dedde8360de (patch)
treec21af982e89753690e62751bb1cd74ec7d2cee7d
parent139f378983f9da23af928decab67afc0eb62c324 (diff)
downloadqpid-python-4072ab6f9e4d5eafb3ffa25cb8538dedde8360de.tar.gz
Added --timeout options to cli tools.
Cli tools will not hang indefinitely if the broker is non-responsive. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@788681 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xpython/commands/qpid-cluster14
-rwxr-xr-xpython/commands/qpid-config10
-rwxr-xr-xpython/commands/qpid-route14
-rwxr-xr-xpython/commands/qpid-stat23
-rwxr-xr-xpython/commands/qpid-tool4
-rw-r--r--python/qmf/console.py27
-rw-r--r--python/qpid/managementdata.py14
7 files changed, 74 insertions, 32 deletions
diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster
index 07fa666041..f196a6e2b0 100755
--- a/python/commands/qpid-cluster
+++ b/python/commands/qpid-cluster
@@ -28,6 +28,7 @@ import re
from qmf.console import Session
_host = "localhost"
+_connTimeout = 10
_stopId = None
_stopAll = False
_force = False
@@ -42,6 +43,7 @@ def Usage ():
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
print " -C [--all-connections] View client connections to all cluster members"
print " -c [--connections] ID View client connections to specified member"
print " -d [--del-connection] HOST:PORT"
@@ -88,7 +90,7 @@ class BrokerManager:
def SetBroker(self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl)
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == 0:
@@ -200,7 +202,7 @@ class BrokerManager:
idx = 0
for host in hostList:
if _showConn == "all" or _showConn == idList[idx] or _delConn:
- self.brokers.append(self.qmf.addBroker(host))
+ self.brokers.append(self.qmf.addBroker(host, _connTimeout))
displayList.append(idList[idx])
idx += 1
@@ -247,7 +249,7 @@ class BrokerManager:
##
try:
- longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric")
+ longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "s:kfCc:d:n", longOpts)
except:
Usage()
@@ -260,6 +262,10 @@ except:
count = 0
for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
if opt[0] == "-s" or opt[0] == "--stop":
_stopId = opt[1]
if len(_stopId.split(":")) != 2:
@@ -316,7 +322,7 @@ except Exception,e:
if e.__repr__().find("connection aborted") > 0:
# we expect this when asking the connected broker to shut down
sys.exit(0)
- print "Failed:", e.args
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
bm.Disconnect()
diff --git a/python/commands/qpid-config b/python/commands/qpid-config
index 59145620cd..9204a1faa7 100755
--- a/python/commands/qpid-config
+++ b/python/commands/qpid-config
@@ -27,6 +27,7 @@ from qmf.console import Session
_recursive = False
_host = "localhost"
+_connTimeout = 10
_altern_ex = None
_passive = False
_durable = False
@@ -67,6 +68,7 @@ def Usage ():
print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
print
print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
print " -b [ --bindings ] Show bindings in queue or exchange list"
print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker"
print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
@@ -135,7 +137,7 @@ class BrokerManager:
def SetBroker (self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl)
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == 0:
@@ -371,7 +373,7 @@ try:
longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=",
"file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=",
"order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty",
- "force_if_used", "alternate-exchange=", "passive")
+ "force_if_used", "alternate-exchange=", "passive", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts)
except:
Usage ()
@@ -387,6 +389,10 @@ for opt in optlist:
_recursive = True
if opt[0] == "-a" or opt[0] == "--broker-addr":
_host = opt[1]
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
if opt[0] == "--alternate-exchange":
_altern_ex = opt[1]
if opt[0] == "--passive":
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index 324ce2e176..b515b91267 100755
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -43,6 +43,7 @@ def Usage():
print " qpid-route [OPTIONS] link list [<dest-broker>]"
print
print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
print " -v [ --verbose ] Verbose output"
print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
print " -d [ --durable ] Added configuration shall be durable"
@@ -64,13 +65,14 @@ _dellink = False
_srclocal = False
_transport = "tcp"
_ack = 0
+_connTimeout = 10
class RouteManager:
def __init__(self, localBroker):
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
- self.broker = self.qmf.addBroker(localBroker)
+ self.broker = self.qmf.addBroker(localBroker, _connTimeout)
def disconnect(self):
self.qmf.delBroker(self.broker)
@@ -143,7 +145,7 @@ class RouteManager:
if url.name() not in brokerList:
print " %s..." % url.name(),
try:
- b = qmf.addBroker("%s:%d" % (link.host, link.port))
+ b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
brokerList[url.name()] = b
added = True
print "Ok"
@@ -403,7 +405,7 @@ def YN(val):
##
try:
- longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=")
+ longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
except:
Usage()
@@ -415,6 +417,10 @@ except:
cargs = encArgs
for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
if opt[0] == "-v" or opt[0] == "--verbose":
_verbose = True
if opt[0] == "-q" or opt[0] == "--quiet":
@@ -512,7 +518,7 @@ try:
Usage()
except Exception,e:
- print "Failed:", e.args
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
rm.disconnect()
diff --git a/python/commands/qpid-stat b/python/commands/qpid-stat
index 26860e5853..696ff5f954 100755
--- a/python/commands/qpid-stat
+++ b/python/commands/qpid-stat
@@ -29,7 +29,7 @@ from qmf.console import Session, Console
from qpid.disp import Display, Header, Sorter
_host = "localhost"
-_top = False
+_connTimeout = 10
_types = ""
_limit = 50
_increasing = False
@@ -42,10 +42,10 @@ def Usage ():
print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
-# print "General Options:"
+ print "General Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
# print " -n [--numeric] Don't resolve names"
-# print " -t [--top] Repeatedly display top items"
-# print
+ print
print "Display Options:"
print
print " -b Show Brokers"
@@ -144,7 +144,7 @@ class BrokerManager(Console):
def SetBroker(self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl)
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == 0:
@@ -389,7 +389,7 @@ class BrokerManager(Console):
self.qmf.delBroker(self.broker)
self.broker = None
for host in hostList:
- b = self.qmf.addBroker(host)
+ b = self.qmf.addBroker(host, _connTimeout)
self.brokers.append(Broker(self.qmf, b))
else:
self.brokers.append(Broker(self.qmf, self.broker))
@@ -402,7 +402,7 @@ class BrokerManager(Console):
##
try:
- longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing")
+ longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts)
except:
Usage()
@@ -414,8 +414,10 @@ except:
cargs = encArgs
for opt in optlist:
- if opt[0] == "-t" or opt[0] == "--top":
- _top = True
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
elif opt[0] == "-n" or opt[0] == "--numeric":
_numeric = True
elif opt[0] == "-S" or opt[0] == "--sort-by":
@@ -448,8 +450,7 @@ try:
except KeyboardInterrupt:
print
except Exception,e:
- print "Failed:", e.args
- #raise # TODO: Remove before flight
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
bm.Disconnect()
diff --git a/python/commands/qpid-tool b/python/commands/qpid-tool
index 14308f69fb..05afcc9732 100755
--- a/python/commands/qpid-tool
+++ b/python/commands/qpid-tool
@@ -24,7 +24,7 @@ import getopt
import sys
import socket
from cmd import Cmd
-from qpid.connection import ConnectionFailed
+from qpid.connection import ConnectionFailed, Timeout
from qpid.managementdata import ManagementData
from shlex import split
from qpid.disp import Display
@@ -183,6 +183,8 @@ except ConnectionFailed, e:
except Exception, e:
if str(e).find ("Exchange not found") != -1:
print "Management not enabled on broker: Use '-m yes' option on broker startup."
+ else:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
# Instantiate the CLI interpreter and launch it.
diff --git a/python/qmf/console.py b/python/qmf/console.py
index 1527c7849a..828023f132 100644
--- a/python/qmf/console.py
+++ b/python/qmf/console.py
@@ -469,11 +469,11 @@ class Session:
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
- def addBroker(self, target="localhost"):
+ def addBroker(self, target="localhost", timeout=None):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
- ssl = url.scheme == URL.AMQPS)
+ ssl = url.scheme == URL.AMQPS, connTimeout=timeout)
self.brokers.append(broker)
if not self.manageConnections:
@@ -1551,11 +1551,12 @@ class Broker:
SYNC_TIME = 60
nextSeq = 1
- def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
+ def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False, connTimeout=None):
self.session = session
self.host = host
self.port = port
self.ssl = ssl
+ self.connTimeout = connTimeout
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
@@ -1641,13 +1642,21 @@ class Broker:
sock = connect(self.host, self.port)
sock.settimeout(5)
+ oldTimeout = sock.gettimeout()
+ sock.settimeout(self.connTimeout)
if self.ssl:
- sock = ssl(sock)
- self.conn = Connection(sock, username=self.authUser, password=self.authPass, heartbeat=2)
+ connSock = ssl(sock)
+ else:
+ connSock = sock
+ self.conn = Connection(connSock, username=self.authUser, password=self.authPass)
def aborted():
- raise Timeout("read timed out")
+ raise Timeout("Waiting for connection to be established with broker")
+ oldAborted = self.conn.aborted
self.conn.aborted = aborted
self.conn.start()
+ sock.settimeout(oldTimeout)
+ self.conn.aborted = oldAborted
+
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
self.amqpSession.auto_sync = True
@@ -1681,13 +1690,13 @@ class Broker:
self._send(msg)
except socket.error, e:
- self.error = "Socket Error %s - %s" % (e[0], e[1])
+ self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
raise
except Closed, e:
- self.error = "Connect Failed %d - %s" % (e[0], e[1])
+ self.error = "Connect Failed %d - %s" % (e.__class__.__name__, e)
raise
except ConnectionFailed, e:
- self.error = "Connect Failed %d - %s" % (e[0], e[1])
+ self.error = "Connect Failed %d - %s" % (e.__class__.__name__, e)
raise
def _updateAgent(self, obj):
diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py
index 84eb9c3ff8..c0d32d46cf 100644
--- a/python/qpid/managementdata.py
+++ b/python/qpid/managementdata.py
@@ -31,6 +31,7 @@ import struct
import os
import platform
import locale
+from qpid.connection import Timeout
from qpid.management import managementChannel, managementClient
from threading import Lock
from disp import Display
@@ -206,11 +207,22 @@ class ManagementData:
self.sessionId = "%s.%d" % (platform.uname()[1], os.getpid())
self.broker = Broker (host)
- self.conn = Connection (connect (self.broker.host, self.broker.port),
+ sock = connect (self.broker.host, self.broker.port)
+ oldTimeout = sock.gettimeout()
+ sock.settimeout(10)
+ self.conn = Connection (sock,
username=self.broker.username, password=self.broker.password)
self.spec = self.conn.spec
+ def aborted():
+ raise Timeout("Waiting for connection to be established with broker")
+ oldAborted = self.conn.aborted
+ self.conn.aborted = aborted
+
self.conn.start ()
+ sock.settimeout(oldTimeout)
+ self.conn.aborted = oldAborted
+
self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
self.instHandler, self.methodReply, self.closeHandler)
self.mclient.schemaListener (self.schemaHandler)