diff options
| author | Alan Conway <aconway@apache.org> | 2007-11-30 17:11:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-11-30 17:11:47 +0000 |
| commit | 854f0fecc29299c3abbd9a95331acc63fa7ada62 (patch) | |
| tree | b316dbc2d1ae95b546432d1cd2b5b03d8ad69ddb /python/examples/fanout | |
| parent | 599b50264cfb4ff75728264755e5ed4efef1fe83 (diff) | |
| download | qpid-python-854f0fecc29299c3abbd9a95331acc63fa7ada62.tar.gz | |
Python examples
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@599876 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/examples/fanout')
| -rw-r--r-- | python/examples/fanout/config_fanout_exchange.py | 54 | ||||
| -rw-r--r-- | python/examples/fanout/fanout_consumer.py | 75 | ||||
| -rw-r--r-- | python/examples/fanout/fanout_producer.py | 49 |
3 files changed, 178 insertions, 0 deletions
diff --git a/python/examples/fanout/config_fanout_exchange.py b/python/examples/fanout/config_fanout_exchange.py new file mode 100644 index 0000000000..6eef1b94e3 --- /dev/null +++ b/python/examples/fanout/config_fanout_exchange.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +""" + config_direct_exchange.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Create a queue ------------------------------------- + +# Create a queue named "listener" on channel 1, and bind it +# to the "amq.fanout" exchange. +# +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# queue_bind() determines which messages are routed to a queue. +# Route all messages with the routing key "routing_key" to +# the AMQP queue named "message_queue". + +session.queue_declare(queue="message_queue") +session.queue_bind(exchange="amq.fanout", queue="message_queue") + +#----- Cleanup --------------------------------------------- + +# Clean up before exiting so there are no open threads. +# channel.session_close() + diff --git a/python/examples/fanout/fanout_consumer.py b/python/examples/fanout/fanout_consumer.py new file mode 100644 index 0000000000..e9a2291f4c --- /dev/null +++ b/python/examples/fanout/fanout_consumer.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +consumer_tag = "consumer1" +queue = client.queue(consumer_tag) + +# Call message_consume() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_consume() is called. + +session.message_subscribe(queue="message_queue", destination=consumer_tag) +session.message_flow(consumer_tag, 0, 0xFFFFFFFF) +session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + +# Initialize 'final' and 'content', variables used to identify the last message. + +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +message = None +while content != final: + message = queue.get(timeout=10) + content = message.content.body + print content + +# Messages are not removed from the queue until they are +# acknowledged. Using cumulative=True, all messages from the session +# up to and including the one identified by the delivery tag are +# acknowledged. This is more efficient, because there are fewer +# network round-trips. + +message.complete(cumulative=True) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.session_close() diff --git a/python/examples/fanout/fanout_producer.py b/python/examples/fanout/fanout_producer.py new file mode 100644 index 0000000000..42570ed510 --- /dev/null +++ b/python/examples/fanout/fanout_producer.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +""" + direct_producer.py + + Publishes messages to an AMQP direct exchange, using + the routing key "routing_key" +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +for i in range(10): + message = Content(body="message " + str(i)) + session.message_transfer(destination="amq.fanout", content=message) + +final="That's all, folks!" +message=Content(final) +session.message_transfer(destination="amq.fanout", content=message) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.session_close() + |
