summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-24 21:27:51 +0000
committerAlan Conway <aconway@apache.org>2012-02-24 21:27:51 +0000
commitd8740d624b2152b7b5e45e4655dfd1255544a3a4 (patch)
tree4885abb57282d2eb349792e9396de7f8a8f4385e /qpid
parent67d8640e8d9315a22c1f54fce885ca8c80b09b2c (diff)
downloadqpid-python-d8740d624b2152b7b5e45e4655dfd1255544a3a4.tar.gz
QPID-3603: Add support for replicated queues to qpid-config.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1293428 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rwxr-xr-xqpid/tools/src/py/qpid-config33
-rwxr-xr-xqpid/tools/src/py/qpid-ha11
2 files changed, 36 insertions, 8 deletions
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index 60fc53ce4b..007b25af86 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -62,14 +62,22 @@ Add Exchange <type> values:
xml XML Exchange - allows content filtering using an XQuery
-Queue Limit Actions
+Queue Limit Actions:
none (default) - Use broker's default policy
reject - Reject enqueued messages
flow-to-disk - Page messages to disk
ring - Replace oldest unacquired message with new
- ring-strict - Replace oldest message, reject if oldest is acquired"""
+ ring-strict - Replace oldest message, reject if oldest is acquired
+Replicate levels:
+
+ none - no replication
+ configuration - replicate queue and exchange existence and bindings, but not messages.
+ messages - replicate configuration and messages
+"""
+
+REPLICATE_LEVELS= ["none", "configuration", "messages"]
class Config:
def __init__(self):
@@ -79,6 +87,7 @@ class Config:
self._ignoreDefault = False
self._altern_ex = None
self._durable = False
+ self._replicate = None
self._clusterDurable = False
self._if_empty = True
self._if_unused = True
@@ -120,14 +129,18 @@ FLOW_STOP_SIZE = "qpid.flow_stop_size"
FLOW_RESUME_SIZE = "qpid.flow_resume_size"
MSG_GROUP_HDR_KEY = "qpid.group_header_key"
SHARED_MSG_GROUP = "qpid.shared_msg_group"
+REPLICATE = "qpid.replicate"
#There are various arguments to declare that have specific program
#options in this utility. However there is now a generic mechanism for
#passing arguments as well. The SPECIAL_ARGS list contains the
#arguments for which there are specific program options defined
#i.e. the arguments for which there is special processing on add and
#list
-SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
- MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
+SPECIAL_ARGS=[
+ FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,
+ LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,
+ FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -145,7 +158,7 @@ class JHelpFormatter(IndentedHelpFormatter):
def Usage():
print usage
- exit(-1)
+ sys.exit(-1)
def OptionsAndArguments(argv):
""" Set global variables for options, return arguments """
@@ -171,6 +184,7 @@ def OptionsAndArguments(argv):
group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
+ group2.add_option("--replicate", action="store", metavar="<level>", help="Replication level for the new queue or exchange (none, configuration or messages).")
parser.add_option_group(group2)
group3 = OptionGroup(parser, "Options for Adding Queues")
@@ -236,6 +250,10 @@ def OptionsAndArguments(argv):
config._altern_ex = opts.alternate_exchange
if opts.durable:
config._durable = True
+ if opts.replicate:
+ if not opts.replicate in REPLICATE_LEVELS:
+ raise Exception("Invalid replicate level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS)))
+ config._replicate = opts.replicate
if opts.cluster_durable:
config._clusterDurable = True
if opts.file:
@@ -442,6 +460,7 @@ class BrokerManager:
args = q.arguments
if not args: args = {}
if q.durable: print "--durable",
+ if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
if q.autoDelete: print "auto-del",
if q.exclusive: print "excl",
@@ -503,6 +522,8 @@ class BrokerManager:
declArgs['alternate-exchange'] = config._altern_ex
if config._durable:
declArgs['durable'] = 1
+ if config._replicate:
+ declArgs[REPLICATE] = config._replicate
self.broker.addExchange(etype, ename, declArgs)
@@ -569,6 +590,8 @@ class BrokerManager:
declArgs['alternate-exchange'] = config._altern_ex
if config._durable:
declArgs['durable'] = 1
+ if config._replicate:
+ declArgs[REPLICATE] = config._replicate
self.broker.addQueue(qname, declArgs)
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
index d81a7a466c..90e11fc5ab 100755
--- a/qpid/tools/src/py/qpid-ha
+++ b/qpid/tools/src/py/qpid-ha
@@ -139,16 +139,21 @@ HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
class Command:
commands = {}
- def __init__(self, name, help, args=""):
+ def __init__(self, name, help, args=[]):
Command.commands[name] = self
self.name = name
- usage="%s [options]%s\n\n%s"%(name, args, help)
+ self.args = args
+ usage="%s [options] %s\n\n%s"%(name, " ".join(args), help)
self.help = help
self.op=optparse.OptionParser(usage)
self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>")
def execute(self, command):
opts, args = self.op.parse_args(command)
+ if len(args) != len(self.args)+1:
+ self.op.print_help()
+ print "Error: wrong number of arguments"
+ return
broker = opts.broker or "localhost:5672"
# FIXME aconway 2012-02-23: enforce not doing primary-only operations on a backup & vice versa
connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
@@ -183,7 +188,7 @@ ReadyCmd()
class ReplicateCmd(Command):
def __init__(self):
- Command.__init__(self, "replicate", "Replicate <queue> from <broker> to the current broker.", "<queue> <broker>")
+ Command.__init__(self, "replicate", "Replicate <queue> from broker <primary> to the current broker.", ["<queue>", "<primary>"])
ReplicateCmd()
class SetCmd(Command):