diff options
| author | Alan Conway <aconway@apache.org> | 2008-02-15 21:00:44 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-02-15 21:00:44 +0000 |
| commit | 5034a2088eb20620225d2dfd4c2ae40b42b4c57d (patch) | |
| tree | 943081284fd77a33adffe3e1203528659237deff /qpid/python/examples/fanout/fanout_consumer.py | |
| parent | 491b221e39621eb9977c6bfa76f3587870d0d3a4 (diff) | |
| download | qpid-python-5034a2088eb20620225d2dfd4c2ae40b42b4c57d.tar.gz | |
Updated c++ and python fanout examples and verify scripts.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@628169 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/examples/fanout/fanout_consumer.py')
| -rwxr-xr-x | qpid/python/examples/fanout/fanout_consumer.py | 92 |
1 files changed, 57 insertions, 35 deletions
diff --git a/qpid/python/examples/fanout/fanout_consumer.py b/qpid/python/examples/fanout/fanout_consumer.py index b91ea35c0d..ef24bf35b2 100755 --- a/qpid/python/examples/fanout/fanout_consumer.py +++ b/qpid/python/examples/fanout/fanout_consumer.py @@ -5,13 +5,57 @@ This AMQP client reads messages from a message queue named "message_queue". """ - +import base64 import qpid import sys from qpid.client import Client from qpid.content import Content from qpid.queue import Empty +#----- Functions ------------------------------------------- + +def dump_queue(client, queue_name): + + print "Messages queue: " + queue_name + + consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag + queue = client.queue(consumer_tag) + + # Call message_subscribe() to tell the broker to deliver messages + # from the AMQP queue to a local client queue. The broker will + # start delivering messages as soon as message_subscribe() is called. + + session.message_subscribe(queue=queue_name, destination=consumer_tag) + session.message_flow(consumer_tag, 0, 0xFFFFFFFF) + session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + + print "Subscribed to queue " + queue_name + sys.stdout.flush() + + message = 0 + + while True: + try: + message = queue.get(timeout=10) + content = message.content.body + print "Response: " + content + except Empty: + print "No more messages!" + break + except: + print "Unexpected exception!" + break + + + # Messages are not removed from the queue until they + # are acknowledged. Using cumulative=True, all messages + # in the session up to and including the one identified + # by the delivery tag are acknowledged. This is more efficient, + # because there are fewer network round-trips. + + if message != 0: + message.complete(cumulative=True) + #----- Initialization -------------------------------------- @@ -29,44 +73,22 @@ client = Client(host, port, qpid.spec.load(amqp_spec)) client.start({"LOGIN": user, "PASSWORD": password}) session = client.session() -session.session_open() - -#----- Read from queue -------------------------------------------- - -# Now let's create a local client queue and tell it to read -# incoming messages. - -# The consumer tag identifies the client-side queue. - -consumer_tag = "consumer1" -queue = client.queue(consumer_tag) - -# Call message_subscribe() to tell the broker to deliver messages -# from the AMQP queue to this local client queue. The broker will -# start delivering messages as soon as message_subscribe() is called. - -session.message_subscribe(queue="message_queue", destination=consumer_tag) -session.message_flow(consumer_tag, 0, 0xFFFFFFFF) -session.message_flow(consumer_tag, 1, 0xFFFFFFFF) - -# Initialize 'final' and 'content', variables used to identify the last message. +session_info = session.session_open() +session_id = session_info.session_id -final = "That's all, folks!" # In a message body, signals the last message -content = "" # Content of the last message read +#----- Main Body -- ---------------------------------------- -message = None -while content != final: - message = queue.get(timeout=10) - content = message.content.body - print content +# Make a unique queue name for my queue from the session ID. +my_queue = base64.urlsafe_b64encode(session_id) +session.queue_declare(queue=my_queue) -# Messages are not removed from the queue until they are -# acknowledged. Using cumulative=True, all messages from the session -# up to and including the one identified by the delivery tag are -# acknowledged. This is more efficient, because there are fewer -# network round-trips. +# Bind my queue to the fanout exchange. No routing key is required +# the fanout exchange copies messages unconditionally to every +# bound queue +session.queue_bind(queue=my_queue, exchange="amq.fanout") -message.complete(cumulative=True) +# Dump the messages on the queue. +dump_queue(client, my_queue) #----- Cleanup ------------------------------------------------ |
