summaryrefslogtreecommitdiff
path: root/python/examples/fanout/fanout_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/examples/fanout/fanout_consumer.py')
-rwxr-xr-xpython/examples/fanout/fanout_consumer.py92
1 files changed, 57 insertions, 35 deletions
diff --git a/python/examples/fanout/fanout_consumer.py b/python/examples/fanout/fanout_consumer.py
index b91ea35c0d..ef24bf35b2 100755
--- a/python/examples/fanout/fanout_consumer.py
+++ b/python/examples/fanout/fanout_consumer.py
@@ -5,13 +5,57 @@
This AMQP client reads messages from a message
queue named "message_queue".
"""
-
+import base64
import qpid
import sys
from qpid.client import Client
from qpid.content import Content
from qpid.queue import Empty
+#----- Functions -------------------------------------------
+
+def dump_queue(client, queue_name):
+
+ print "Messages queue: " + queue_name
+
+ consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag
+ queue = client.queue(consumer_tag)
+
+ # Call message_subscribe() to tell the broker to deliver messages
+ # from the AMQP queue to a local client queue. The broker will
+ # start delivering messages as soon as message_subscribe() is called.
+
+ session.message_subscribe(queue=queue_name, destination=consumer_tag)
+ session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
+ session.message_flow(consumer_tag, 1, 0xFFFFFFFF)
+
+ print "Subscribed to queue " + queue_name
+ sys.stdout.flush()
+
+ message = 0
+
+ while True:
+ try:
+ message = queue.get(timeout=10)
+ content = message.content.body
+ print "Response: " + content
+ except Empty:
+ print "No more messages!"
+ break
+ except:
+ print "Unexpected exception!"
+ break
+
+
+ # Messages are not removed from the queue until they
+ # are acknowledged. Using cumulative=True, all messages
+ # in the session up to and including the one identified
+ # by the delivery tag are acknowledged. This is more efficient,
+ # because there are fewer network round-trips.
+
+ if message != 0:
+ message.complete(cumulative=True)
+
#----- Initialization --------------------------------------
@@ -29,44 +73,22 @@ client = Client(host, port, qpid.spec.load(amqp_spec))
client.start({"LOGIN": user, "PASSWORD": password})
session = client.session()
-session.session_open()
-
-#----- Read from queue --------------------------------------------
-
-# Now let's create a local client queue and tell it to read
-# incoming messages.
-
-# The consumer tag identifies the client-side queue.
-
-consumer_tag = "consumer1"
-queue = client.queue(consumer_tag)
-
-# Call message_subscribe() to tell the broker to deliver messages
-# from the AMQP queue to this local client queue. The broker will
-# start delivering messages as soon as message_subscribe() is called.
-
-session.message_subscribe(queue="message_queue", destination=consumer_tag)
-session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
-session.message_flow(consumer_tag, 1, 0xFFFFFFFF)
-
-# Initialize 'final' and 'content', variables used to identify the last message.
+session_info = session.session_open()
+session_id = session_info.session_id
-final = "That's all, folks!" # In a message body, signals the last message
-content = "" # Content of the last message read
+#----- Main Body -- ----------------------------------------
-message = None
-while content != final:
- message = queue.get(timeout=10)
- content = message.content.body
- print content
+# Make a unique queue name for my queue from the session ID.
+my_queue = base64.urlsafe_b64encode(session_id)
+session.queue_declare(queue=my_queue)
-# Messages are not removed from the queue until they are
-# acknowledged. Using cumulative=True, all messages from the session
-# up to and including the one identified by the delivery tag are
-# acknowledged. This is more efficient, because there are fewer
-# network round-trips.
+# Bind my queue to the fanout exchange. No routing key is required
+# the fanout exchange copies messages unconditionally to every
+# bound queue
+session.queue_bind(queue=my_queue, exchange="amq.fanout")
-message.complete(cumulative=True)
+# Dump the messages on the queue.
+dump_queue(client, my_queue)
#----- Cleanup ------------------------------------------------