summaryrefslogtreecommitdiff
path: root/qpid/tools
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
commit6010991600d605d2a82a0c64a105a7ceabecffae (patch)
treeb0ef3181b068fe2b5d187610810f44b264da8c1d /qpid/tools
parentaebc3e80cccef161d51257bd958331bc5d48768e (diff)
downloadqpid-python-6010991600d605d2a82a0c64a105a7ceabecffae.tar.gz
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1180050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tools')
-rwxr-xr-xqpid/tools/src/py/qpid-config22
1 files changed, 21 insertions, 1 deletions
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index cd80e26a1e..1ee35da8c3 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -96,6 +96,8 @@ class Config:
self._flowResumeCount = None
self._flowStopSize = None
self._flowResumeSize = None
+ self._msgGroupHeader = None
+ self._sharedMsgGroup = False
self._extra_arguments = []
self._returnCode = 0
@@ -116,13 +118,16 @@ FLOW_STOP_COUNT = "qpid.flow_stop_count"
FLOW_RESUME_COUNT = "qpid.flow_resume_count"
FLOW_STOP_SIZE = "qpid.flow_stop_size"
FLOW_RESUME_SIZE = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP = "qpid.shared_msg_group"
#There are various arguments to declare that have specific program
#options in this utility. However there is now a generic mechanism for
#passing arguments as well. The SPECIAL_ARGS list contains the
#arguments for which there are specific program options defined
#i.e. the arguments for which there is special processing on add and
#list
-SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE]
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -182,6 +187,10 @@ def OptionsAndArguments(argv):
help="Turn on sender flow control when the number of queued messages exceeds this value.")
group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
help="Turn off sender flow control when the number of queued messages drops below this value.")
+ group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+ help="Enable message groups. Specify name of header that holds group identifier.")
+ group3.add_option("--shared-groups", action="store_true",
+ help="Allow message group consumption across multiple consumers.")
group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
@@ -263,6 +272,10 @@ def OptionsAndArguments(argv):
config._flowStopCount = opts.flow_stop_count
if opts.flow_resume_count:
config._flowResumeCount = opts.flow_resume_count
+ if opts.group_header:
+ config._msgGroupHeader = opts.group_header
+ if opts.shared_groups:
+ config._sharedMsgGroup = True
if opts.extra_arguments:
config._extra_arguments = opts.extra_arguments
return args
@@ -442,6 +455,8 @@ class BrokerManager:
if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+ if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+ if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
def QueueListRecurse(self, filter):
@@ -534,6 +549,11 @@ class BrokerManager:
if config._flowResumeCount:
declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+ if config._msgGroupHeader:
+ declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+ if config._sharedMsgGroup:
+ declArgs[SHARED_MSG_GROUP] = 1
+
if config._altern_ex != None:
self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else: