summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/examples/README.txt5
-rwxr-xr-xqpid/python/examples/api/receive194
-rwxr-xr-xqpid/python/examples/api/send281
3 files changed, 0 insertions, 480 deletions
diff --git a/qpid/python/examples/README.txt b/qpid/python/examples/README.txt
index 3a3e421a1e..4395160fec 100644
--- a/qpid/python/examples/README.txt
+++ b/qpid/python/examples/README.txt
@@ -14,11 +14,6 @@ api/spout -- A simple messaging client that sends
messages to the target specified on the
command line.
-api/send -- Sends messages to a specified queue.
-
-api/receive -- Receives messages from a specified queue.
- Use with the send example above.
-
api/server -- An example server that process incoming
messages and sends replies.
diff --git a/qpid/python/examples/api/receive b/qpid/python/examples/api/receive
deleted file mode 100755
index f14df277ac..0000000000
--- a/qpid/python/examples/api/receive
+++ /dev/null
@@ -1,194 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import optparse, sys, time
-import statistics
-from qpid.messaging import *
-
-SECOND = 1000
-TIME_SEC = 1000000000
-
-op = optparse.OptionParser(usage="usage: %prog [options]", description="Drains messages from the specified address")
-op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to")
-op.add_option("-a", "--address", type="str", help="address to receive from")
-op.add_option("--connection-options", default={}, help="options for the connection")
-op.add_option("-m", "--messages", default=0, type="int", help="stop after N messages have been received, 0 means no limit")
-op.add_option("--timeout", default=0, type="int", help="timeout in seconds to wait before exiting")
-op.add_option("-f", "--forever", default=False, action="store_true", help="ignore timeout and wait forever")
-op.add_option("--ignore-duplicates", default=False, action="store_true", help="Detect and ignore duplicates (by checking 'sn' header)")
-op.add_option("--verify-sequence", default=False, action="store_true", help="Verify there are no gaps in the message sequence (by checking 'sn' header)")
-op.add_option("--check-redelivered", default=False, action="store_true", help="Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
-op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue")
-op.add_option("--ack-frequency", default=100, type="int", help="Ack frequency (0 implies none of the messages will get accepted)")
-op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)")
-op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)")
-op.add_option("--print-content", type="str", default="yes", help="print out message content")
-op.add_option("--print-headers", type="str", default="no", help="print out message headers")
-op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover")
-op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics")
-op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages")
-op.add_option("--report-header", type="str", default="yes", help="Headers on report")
-op.add_option("--ready-address", type="str", help="send a message to this address when ready to receive")
-op.add_option("--receive-rate", default=0, type="int", help="Receive at rate of N messages/second. 0 means receive as fast as possible")
-#op.add_option("--help", default=False, action="store_true", help="print this usage statement")
-
-def getTimeout(timeout, forever):
- if forever:
- return None
- else:
- return SECOND*timeout
-
-
-EOS = "eos"
-SN = "sn"
-
-# Check for duplicate or dropped messages by sequence number
-class SequenceTracker:
- def __init__(self, opts):
- self.opts = opts
- self.lastSn = 0
-
- # Return True if the message should be procesed, false if it should be ignored.
- def track(self, message):
- if not(self.opts.verify_sequence) or (self.opts.ignore_duplicates):
- return True
- sn = message.properties[SN]
- duplicate = (sn <= lastSn)
- dropped = (sn > lastSn+1)
- if self.opts.verify_sequence and dropped:
- raise Exception("Gap in sequence numbers %s-%s" %(lastSn, sn))
- ignore = (duplicate and self.opts.ignore_duplicates)
- if ignore and self.opts.check_redelivered and (not msg.redelivered):
- raise Exception("duplicate sequence number received, message not marked as redelivered!")
- if not duplicate:
- lastSn = sn
- return (not(ignore))
-
-
-def main():
- opts, args = op.parse_args()
- if not opts.address:
- raise Exception("Address must be specified!")
-
- broker = opts.broker
- address = opts.address
- connection = Connection(opts.broker, **opts.connection_options)
-
- try:
- connection.open()
- if opts.failover_updates:
- auto_fetch_reconnect_urls(connection)
- session = connection.session(transactional=(opts.tx))
- receiver = session.receiver(opts.address)
- if opts.capacity > 0:
- receiver.capacity = opts.capacity
- msg = Message()
- count = 0
- txCount = 0
- sequenceTracker = SequenceTracker(opts)
- timeout = getTimeout(opts.timeout, opts.forever)
- done = False
- stats = statistics.ThroughputAndLatency()
- reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats)
-
- if opts.ready_address is not None:
- session.sender(opts.ready_address).send(msg)
- if opts.tx > 0:
- session.commit()
- # For receive rate calculation
- start = time.time()*TIME_SEC
- interval = 0
- if opts.receive_rate > 0:
- interval = TIME_SEC / opts.receive_rate
-
- replyTo = {} # a dictionary of reply-to address -> sender mapping
-
- while (not done):
- try:
- msg = receiver.fetch(timeout=timeout)
- reporter.message(msg)
- if sequenceTracker.track(msg):
- if msg.content == EOS:
- done = True
- else:
- count+=1
- if opts.print_headers == "yes":
- if msg.subject is not None:
- print "Subject: %s" %msg.subject
- if msg.reply_to is not None:
- print "ReplyTo: %s" %msg.reply_to
- if msg.correlation_id is not None:
- print "CorrelationId: %s" %msg.correlation_id
- if msg.user_id is not None:
- print "UserId: %s" %msg.user_id
- if msg.ttl is not None:
- print "TTL: %s" %msg.ttl
- if msg.priority is not None:
- print "Priority: %s" %msg.priority
- if msg.durable:
- print "Durable: true"
- if msg.redelivered:
- print "Redelivered: true"
- print "Properties: %s" %msg.properties
- print
- if opts.print_content == "yes":
- print msg.content
- if (opts.messages > 0) and (count >= opts.messages):
- done = True
- # end of "if sequenceTracker.track(msg):"
- if (opts.tx > 0) and (count % opts.tx == 0):
- txCount+=1
- if (opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0):
- session.rollback()
- else:
- session.commit()
- elif (opts.ack_frequency > 0) and (count % opts.ack_frequency == 0):
- session.acknowledge()
- if msg.reply_to is not None: # Echo message back to reply-to address.
- if msg.reply_to not in replyTo:
- replyTo[msg.reply_to] = session.sender(msg.reply_to)
- replyTo[msg.reply_to].capacity = opts.capacity
- replyTo[msg.reply_to].send(msg)
- if opts.receive_rate > 0:
- delay = start + count*interval - time.time()*TIME_SEC
- if delay > 0:
- time.sleep(delay)
- # Clear out message properties & content for next iteration.
- msg = Message()
- except Empty: # no message fetched => break the while cycle
- break
- # end of while cycle
- if opts.report_total:
- reporter.report()
- if opts.tx > 0:
- txCount+=1
- if opts.rollback_frequency and (txCount % opts.rollback_frequency == 0):
- session.rollback()
- else:
- session.commit()
- else:
- session.acknowledge()
- session.close()
- connection.close()
- except Exception,e:
- print e
- connection.close()
-
-if __name__ == "__main__": main()
diff --git a/qpid/python/examples/api/send b/qpid/python/examples/api/send
deleted file mode 100755
index b0105e41a6..0000000000
--- a/qpid/python/examples/api/send
+++ /dev/null
@@ -1,281 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import optparse, random, os, time, uuid
-from qpid.messaging import *
-import statistics
-
-EOS = "eos"
-SN = "sn"
-TS = "ts"
-
-TIME_SEC = 1000000000
-SECOND = 1000
-
-def nameval(st):
- idx = st.find("=")
- if idx >= 0:
- name = st[0:idx]
- value = st[idx+1:]
- else:
- name = st
- value = None
- return name, value
-
-
-op = optparse.OptionParser(usage="usage: %prog [options]", description="Spouts messages to the specified address")
-op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to")
-op.add_option("-a", "--address", type="str", help="address to send to")
-op.add_option("--connection-options", default={}, help="options for the connection")
-op.add_option("-m", "--messages", default=1, type="int", help="stop after N messages have been sent, 0 means no limit")
-op.add_option("-i", "--id", type="str", help="use the supplied id instead of generating one")
-op.add_option("--reply-to", type="str", help="specify reply-to address")
-op.add_option("--send-eos", default=0, type="int", help="Send N EOS messages to mark end of input")
-op.add_option("--durable", default=False, action="store_true", help="Mark messages as durable")
-op.add_option("--ttl", default=0, type="int", help="Time-to-live for messages, in milliseconds")
-op.add_option("--priority", default=0, type="int", help="Priority for messages (higher value implies higher priority)")
-op.add_option("-P", "--property", default=[], action="append", type="str", help="specify message property")
-op.add_option("--correlation-id", type="str", help="correlation-id for message")
-op.add_option("--user-id", type="str", help="userid for message")
-op.add_option("--content-string", type="str", help="use CONTENT as message content")
-op.add_option("--content-size", default=0, type="int", help="create an N-byte message content")
-op.add_option("-M", "--content-map", default=[], action="append", type="str", help="specify entry for map content")
-op.add_option("--content-stdin", default=False, action="store_true", help="read message content from stdin, one line per message")
-op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue")
-op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)")
-op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)")
-op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover")
-op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics")
-op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages")
-op.add_option("--report-header", type="str", default="yes", help="Headers on report")
-op.add_option("--send-rate", default=0, type="int", help="Send at rate of N messages/second. 0 means send as fast as possible")
-op.add_option("--flow-control", default=0, type="int", help="Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
-op.add_option("--sequence", type="str", default="yes", help="Add a sequence number messages property (required for duplicate/lost message detection)")
-op.add_option("--timestamp", type="str", default="yes", help="Add a time stamp messages property (required for latency measurement)")
-op.add_option("--group-key", type="str", help="Generate groups of messages using message header 'KEY' to hold the group identifier")
-op.add_option("--group-prefix", default="GROUP-", type="str", help="Generate group identifers with 'STRING' prefix (if group-key specified)")
-op.add_option("--group-size", default=10, type="int", help="Number of messages per a group (if group-key specified)")
-op.add_option("--group-randomize-size", default=False, action="store_true", help="Randomize the number of messages per group to [1...group-size] (if group-key specified)")
-op.add_option("--group-interleave", default=1, type="int", help="Simultaineously interleave messages from N different groups (if group-key specified)")
-
-
-class ContentGenerator:
- def setContent(self, msg):
- return
-
-class GetlineContentGenerator(ContentGenerator):
- def setContent(self, msg):
- content = sys.stdin.readline()
- got = (not line)
- if (got):
- msg.content = content
- return got
-
-class FixedContentGenerator(ContentGenerator):
- def __init__(self, content=None):
- self.content = content
-
- def setContent(self, msg):
- msg.content = self.content
- return True
-
-class MapContentGenerator(ContentGenerator):
- def __init__(self, opts=None):
- self.opts = opts
-
- def setContent(self, msg):
- self.content = {}
- for e in self.opts.content_map:
- name, val = nameval(p)
- content[name] = val
- msg.content = self.content
- return True
-
-
-# tag each generated message with a group identifer
-class GroupGenerator:
- def __init__(self, key, prefix, size, randomize, interleave):
- groupKey = key
- groupPrefix = prefix
- groupSize = size
- randomizeSize = randomize
- groupSuffix = 0
- if (randomize > 0):
- random.seed(os.getpid())
-
- for i in range(0, interleave):
- self.newGroup()
- current = 0
-
- def setGroupInfo(self, msg):
- if (current == len(groups)):
- current = 0
- my_group = groups[current]
- msg.properties[groupKey] = my_group[id];
- # print "SENDING GROUPID=[%s]\n" % my_group[id]
- my_group[count]=my_group[count]+1
- if (my_group[count] == my_group[size]):
- self.newGroup()
- del groups[current]
- else:
- current+=1
-
- def newGroup(self):
- groupId = "%s%s" % (groupPrefix, groupSuffix)
- groupSuffix+=1
- size = groupSize
- if (randomizeSize == True):
- size = random.randint(1,groupSize)
- # print "New group: GROUPID=["%s] size=%s" % (groupId, size)
- groups.append({'id':groupId, 'size':size, 'count':0})
-
-
-
-def main():
- opts, args = op.parse_args()
- if not opts.address:
- raise Exception("Address must be specified!")
-
- broker = opts.broker
- address = opts.address
- connection = Connection(opts.broker, **opts.connection_options)
-
- try:
- connection.open()
- if (opts.failover_updates):
- auto_fetch_reconnect_urls(connection)
- session = connection.session(transactional=(opts.tx))
- sender = session.sender(opts.address)
- if (opts.capacity>0):
- sender.capacity = opts.capacity
- sent = 0
- txCount = 0
- stats = statistics.Throughput()
- reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats)
-
- contentGen = ContentGenerator()
- content = "" # auxiliary variable for determining content type of message - needs to be changed to {} for Map message
- if opts.content_stdin:
- opts.messages = 0 # Don't limit number of messages sent.
- contentGen = GetlineContentGenerator()
- elif opts.content_map is not None:
- contentGen = MapContentGenerator(opts)
- content = {}
- elif opts.content_size is not None:
- contentGen = FixedContentGenerator('X' * opts.content_size)
- else:
- contentGen = FixedContentGenerator(opts.content_string)
- if opts.group_key is not None:
- groupGen = GroupGenerator(opts.group_key, opts.group_prefix, opts.group_size, opts.group_random_size, opts.group_interleave)
-
- msg = Message(content=content)
- msg.durable = opts.durable
- if opts.ttl:
- msg.ttl = opts.ttl/1000.0
- if opts.priority:
- msg.priority = opts.priority
- if opts.reply_to is not None:
- if opts.flow_control > 0:
- raise Exception("Can't use reply-to and flow-control together")
- msg.reply_to = opts.reply_to
- if opts.user_id is not None:
- msg.user_id = opts.user_id
- if opts.correlation_id is not None:
- msg.correlation_id = opts.correlation_id
- for p in opts.property:
- name, val = nameval(p)
- msg.properties[name] = val
-
- start = time.time()*TIME_SEC
- interval = 0
- if opts.send_rate > 0:
- interval = TIME_SEC/opts.send_rate
-
- flowControlAddress = "flow-" + str(uuid.uuid1()) + ";{create:always,delete:always}"
- flowSent = 0
- if opts.flow_control > 0:
- flowControlReceiver = session.receiver(flowControlAddress)
- flowControlReceiver.capacity = 2
-
- while (contentGen.setContent(msg) == True):
- sent+=1
- if opts.sequence == "yes":
- msg.properties[SN] = sent
-
- if opts.flow_control > 0:
- if (sent % opts.flow_control == 0):
- msg.reply_to = flowControlAddress
- flowSent+=1
- else:
- msg.reply_to = "" # Clear the reply address.
-
- if 'groupGen' in vars():
- groupGen.setGroupInfo(msg)
-
- if (opts.timestamp == "yes"):
- msg.properties[TS] = int(time.time()*TIME_SEC)
- sender.send(msg)
- reporter.message(msg)
-
- if ((opts.tx > 0) and (sent % opts.tx == 0)):
- txCount+=1
- if ((opts.rollbackFrequency > 0) and (txCount % opts.rollbackFrequency == 0)):
- session.rollback()
- else:
- session.commit()
- if ((opts.messages > 0) and (sent >= opts.messages)):
- break
-
- if (opts.flow_control > 0) and (flowSent == 2):
- flowControlReceiver.fetch(timeout=SECOND)
- flowSent -= 1
-
- if (opts.send_rate > 0):
- delay = start + sent*interval - time.time()*TIME_SEC
- if (delay > 0):
- time.sleep(delay)
- #end of while
-
- while flowSent > 0:
- flowControlReceiver.fetch(timeout=SECOND)
- flowSent -= 1
-
- if (opts.report_total):
- reporter.report()
- for i in reversed(range(1,opts.send_eos+1)):
- if (opts.sequence == "yes"):
- sent+=1
- msg.properties[SN] = sent
- msg.properties[EOS] = True #TODO (also in C++ client): add in ability to send digest or similar
- sender.send(msg)
- if ((opts.tx > 0) and (sent % opts.tx == 0)):
- txCount+=1
- if ((opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0)):
- session.rollback()
- else:
- session.commit()
- session.sync()
- session.close()
- connection.close()
- except Exception,e:
- print e
- connection.close()
-
-if __name__ == "__main__": main()