diff options
author | Alan Conway <aconway@apache.org> | 2007-11-30 17:11:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-30 17:11:47 +0000 |
commit | 854f0fecc29299c3abbd9a95331acc63fa7ada62 (patch) | |
tree | b316dbc2d1ae95b546432d1cd2b5b03d8ad69ddb | |
parent | 599b50264cfb4ff75728264755e5ed4efef1fe83 (diff) | |
download | qpid-python-854f0fecc29299c3abbd9a95331acc63fa7ada62.tar.gz |
Python examples
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@599876 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/examples/direct/config_direct_exchange.py | 57 | ||||
-rw-r--r-- | python/examples/direct/direct_consumer.py | 75 | ||||
-rw-r--r-- | python/examples/direct/direct_producer.py | 51 | ||||
-rw-r--r-- | python/examples/fanout/config_fanout_exchange.py | 54 | ||||
-rw-r--r-- | python/examples/fanout/fanout_consumer.py | 75 | ||||
-rw-r--r-- | python/examples/fanout/fanout_producer.py | 49 | ||||
-rw-r--r-- | python/examples/pubsub/topic_consumer.py | 120 | ||||
-rw-r--r-- | python/examples/pubsub/topic_producer.py | 100 | ||||
-rw-r--r-- | python/examples/request-response/client.py | 112 | ||||
-rw-r--r-- | python/examples/request-response/server.py | 89 |
10 files changed, 782 insertions, 0 deletions
diff --git a/python/examples/direct/config_direct_exchange.py b/python/examples/direct/config_direct_exchange.py new file mode 100644 index 0000000000..45af6f63b9 --- /dev/null +++ b/python/examples/direct/config_direct_exchange.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +""" + config_direct_exchange.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Create a queue ------------------------------------- + +# Create a queue named "listener" on channel 1, and bind it +# to the "amq.direct" exchange. +# +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# queue_bind() determines which messages are routed to a queue. +# Route all messages with the routing key "routing_key" to +# the AMQP queue named "message_queue". + +session.queue_declare(queue="message_queue") +session.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="routing_key") + +#----- Cleanup --------------------------------------------- + +# Clean up before exiting so there are no open threads. +# +# Close Channel 1. +# Close the connection using Channel 0, which is used for all connection methods. +session.session_close() + diff --git a/python/examples/direct/direct_consumer.py b/python/examples/direct/direct_consumer.py new file mode 100644 index 0000000000..4e50dfbc2a --- /dev/null +++ b/python/examples/direct/direct_consumer.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +consumer_tag = "consumer1" +queue = client.queue(consumer_tag) + +# Call message_consume() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_consume() is called. + +session.message_subscribe(queue="message_queue", destination=consumer_tag) +session.message_flow(consumer_tag, 0, 0xFFFFFFFF) # Kill these? +session.message_flow(consumer_tag, 1, 0xFFFFFFFF) # Kill these? + +# Initialize 'final' and 'content', variables used to identify the last message. + +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +message = None +while content != final: + message = queue.get(timeout=10) + content = message.content.body + print content + +# Messages are not removed from the queue until they are +# acknowledged. Using cumulative=True, all messages from the session +# up to and including the one identified by the delivery tag are +# acknowledged. This is more efficient, because there are fewer +# network round-trips. + +message.complete(cumulative=True) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +# session.session_close() diff --git a/python/examples/direct/direct_producer.py b/python/examples/direct/direct_producer.py new file mode 100644 index 0000000000..6770e56803 --- /dev/null +++ b/python/examples/direct/direct_producer.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +""" + direct_producer.py + + Publishes messages to an AMQP direct exchange, using + the routing key "routing_key" +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +for i in range(10): + message = Content("message " + str(i)) + message["routing_key"] = "routing_key" + session.message_transfer(destination="amq.direct", content=message) + +final="That's all, folks!" +message = Content(final) +message["routing_key"] = "routing_key" +session.message_transfer(destination="amq.direct", content=message) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.session_close() + diff --git a/python/examples/fanout/config_fanout_exchange.py b/python/examples/fanout/config_fanout_exchange.py new file mode 100644 index 0000000000..6eef1b94e3 --- /dev/null +++ b/python/examples/fanout/config_fanout_exchange.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +""" + config_direct_exchange.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Create a queue ------------------------------------- + +# Create a queue named "listener" on channel 1, and bind it +# to the "amq.fanout" exchange. +# +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# queue_bind() determines which messages are routed to a queue. +# Route all messages with the routing key "routing_key" to +# the AMQP queue named "message_queue". + +session.queue_declare(queue="message_queue") +session.queue_bind(exchange="amq.fanout", queue="message_queue") + +#----- Cleanup --------------------------------------------- + +# Clean up before exiting so there are no open threads. +# channel.session_close() + diff --git a/python/examples/fanout/fanout_consumer.py b/python/examples/fanout/fanout_consumer.py new file mode 100644 index 0000000000..e9a2291f4c --- /dev/null +++ b/python/examples/fanout/fanout_consumer.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +consumer_tag = "consumer1" +queue = client.queue(consumer_tag) + +# Call message_consume() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_consume() is called. + +session.message_subscribe(queue="message_queue", destination=consumer_tag) +session.message_flow(consumer_tag, 0, 0xFFFFFFFF) +session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + +# Initialize 'final' and 'content', variables used to identify the last message. + +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +message = None +while content != final: + message = queue.get(timeout=10) + content = message.content.body + print content + +# Messages are not removed from the queue until they are +# acknowledged. Using cumulative=True, all messages from the session +# up to and including the one identified by the delivery tag are +# acknowledged. This is more efficient, because there are fewer +# network round-trips. + +message.complete(cumulative=True) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.session_close() diff --git a/python/examples/fanout/fanout_producer.py b/python/examples/fanout/fanout_producer.py new file mode 100644 index 0000000000..42570ed510 --- /dev/null +++ b/python/examples/fanout/fanout_producer.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +""" + direct_producer.py + + Publishes messages to an AMQP direct exchange, using + the routing key "routing_key" +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +for i in range(10): + message = Content(body="message " + str(i)) + session.message_transfer(destination="amq.fanout", content=message) + +final="That's all, folks!" +message=Content(final) +session.message_transfer(destination="amq.fanout", content=message) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.session_close() + diff --git a/python/examples/pubsub/topic_consumer.py b/python/examples/pubsub/topic_consumer.py new file mode 100644 index 0000000000..5db04573f8 --- /dev/null +++ b/python/examples/pubsub/topic_consumer.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +""" + topic_consumer.py + + This AMQP client reads all messages from the + "news", "weather", "usa", and "europe" queues + created and bound by config_topic_exchange.py. +""" + +import base64 + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Functions ------------------------------------------- + +def dump_queue(client, queue_name): + + print "Messages queue: " + queue_name + + consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag + queue = client.queue(consumer_tag) + + # Call basic_consume() to tell the broker to deliver messages + # from the AMQP queue to a local client queue. The broker will + # start delivering messages as soon as basic_consume() is called. + + session.message_subscribe(queue=queue_name, destination=consumer_tag) + session.message_flow(consumer_tag, 0, 0xFFFFFFFF) + session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + + content = "" # Content of the last message read + final = "That's all, folks!" # In a message body, signals the last message + message = 0 + + while content != final: + try: + message = queue.get() + content = message.content.body + print content + except Empty: + if message != 0: + message.complete(cumulative=True) + print "No more messages!" + return + + + # Messages are not removed from the queue until they + # are acknowledged. Using multiple=True, all messages + # in the channel up to and including the one identified + # by the delivery tag are acknowledged. This is more efficient, + # because there are fewer network round-trips. + + if message != 0: + message.complete(cumulative=True) + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +spec = qpid.spec.load(amqp_spec) +client = Client(host, port, spec) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session = session.session_open() # keep the session object, we'll need the session id + +#----- Main Body -- ---------------------------------------- + + +news = "news" + base64.urlsafe_b64encode(session.session_id) +weather = "weather" + base64.urlsafe_b64encode(session.session_id) +usa = "usa" + base64.urlsafe_b64encode(session.session_id) +europe = "europe" + base64.urlsafe_b64encode(session.session_id) + +session.queue_declare(queue=news, exclusive=True) +session.queue_declare(queue=weather, exclusive=True) +session.queue_declare(queue=usa, exclusive=True) +session.queue_declare(queue=europe, exclusive=True) + +# Routing keys may be "usa.news", "usa.weather", "europe.news", or "europe.weather". + +# The '#' symbol matches one component of a multipart name, e.g. "#.news" matches +# "europe.news" or "usa.news". + +session.queue_bind(exchange="amq.topic", queue=news, routing_key="#.news") +session.queue_bind(exchange="amq.topic", queue=weather, routing_key="#.weather") +session.queue_bind(exchange="amq.topic", queue=usa, routing_key="usa.#") +session.queue_bind(exchange="amq.topic", queue=europe, routing_key="europe.#") + +# Remind the user to start the topic producer + +print "Queues create - please start the topic producer" + +# Call dump_queue to print messages from each queue + +dump_queue(client, news) +dump_queue(client, weather) +dump_queue(client, usa) +dump_queue(client, europe) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# +# Close Channel 1. + +session.session_close() + diff --git a/python/examples/pubsub/topic_producer.py b/python/examples/pubsub/topic_producer.py new file mode 100644 index 0000000000..5f8372e7ba --- /dev/null +++ b/python/examples/pubsub/topic_producer.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +""" + topic_producer.py + + This is a simple AMQP publisher application that uses a + Topic exchange. The publisher specifies the routing key + and the exchange for each message. +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login. + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +spec = qpid.spec.load(amqp_spec) +client = Client(host, port, spec) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. Use the +# topic exchange. The routing keys are "usa.news", "usa.weather", +# "europe.news", and "europe.weather". + +final = "That's all, folks!" + +# We'll use the same routing key for all messages in the loop, and +# also for the terminating message. + +# usa.news + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "usa.news" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "usa.news" +session.message_transfer(destination="amq.topic", content=message) + +# usa.weather + + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "usa.weather" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "usa.weather" +session.message_transfer(destination="amq.topic", content=message) + +# europe.news + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "europe.news" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "europe.news" +session.message_transfer(destination="amq.topic", content=message) + + +# europe.weather + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "europe.weather" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "europe.weather" +session.message_transfer(destination="amq.topic", content=message) + + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. +# +# Close Channel 1. + + +session.session_close() + diff --git a/python/examples/request-response/client.py b/python/examples/request-response/client.py new file mode 100644 index 0000000000..23e9310509 --- /dev/null +++ b/python/examples/request-response/client.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python +""" + client.py + + Client for a client/server example + +""" + +import base64 + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Functions ------------------------------------------- + +def dump_queue(client, queue_name): + + print "Messages queue: " + queue_name + + consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag + queue = client.queue(consumer_tag) + + # Call basic_consume() to tell the broker to deliver messages + # from the AMQP queue to a local client queue. The broker will + # start delivering messages as soon as basic_consume() is called. + + session.message_subscribe(queue=queue_name, destination=consumer_tag) + session.message_flow(consumer_tag, 0, 0xFFFFFFFF) + session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + + message = 0 + + while True: + try: + message = queue.get(timeout=10) + content = message.content.body + print "Response: " + content + except Empty: + print "No more messages!" + break + except: + print "Unexpected exception!" + break + + + # Messages are not removed from the queue until they + # are acknowledged. Using multiple=True, all messages + # in the channel up to and including the one identified + # by the delivery tag are acknowledged. This is more efficient, + # because there are fewer network round-trips. + + if message != 0: + message.complete(cumulative=True) + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +spec = qpid.spec.load(amqp_spec) +client = Client(host, port, spec) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Main Body -- ---------------------------------------- + +# Create a response queue for the server to send responses to. Use the +# same string as the name of the queue and the name of the routing +# key. + +replyTo = "ReplyTo:" # + base64.urlsafe_b64encode(session.session_id) +session.queue_declare(queue=replyTo, exclusive=True) +session.queue_bind(exchange="amq.direct", queue=replyTo, routing_key=replyTo) + +# Send some messages to the server's request queue + +lines = ["Twas brilling, and the slithy toves", + "Did gyre and gimble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe."] + +for l in lines: + print "Request: " + l + request=Content(l) + request["routing_key"] = "request" + request["reply_to"] = client.spec.struct("reply_to") + request["reply_to"]["exchange_name"] = "amq.direct" + request["reply_to"]["routing_key"] = replyTo + session.message_transfer(destination="amq.direct", content=request) + +# Now see what messages the server sent to our replyTo queue + +dump_queue(client, replyTo) + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.session_close() diff --git a/python/examples/request-response/server.py b/python/examples/request-response/server.py new file mode 100644 index 0000000000..dd81b419e8 --- /dev/null +++ b/python/examples/request-response/server.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +""" + server.py + + Server for a client/server example +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Functions ------------------------------------------- + +def respond(session, request): + + # The routing key for the response is the request's reply-to + # property. The body for the response is the request's body, + # converted to upper case. + + response=Content(request.body.upper()) + response["routing_key"] = request["reply_to"]["routing_key"] + + session.message_transfer(destination=request["reply_to"]["exchange_name"], content=response) + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +# Open Channel 1 so we can use it to manage our queue. + +session = client.session() +session.session_open() # keep the session object, we'll need the session id + +#----- Main Body -- ---------------------------------------- + +# Create a request queue and subscribe to it + +session.queue_declare(queue="request", exclusive=True) +session.queue_bind(exchange="amq.direct", queue="request", routing_key="request") + +dest = "request_destination" + +session.message_subscribe(queue="request", destination=dest) +session.message_flow(dest, 0, 0xFFFFFFFF) +session.message_flow(dest, 1, 0xFFFFFFFF) + + +# Remind the user to start the client program + +print "Request server running - run your client now." +print "(Times out after 100 seconds ...)" + +# Respond to each request + +queue = client.queue(dest) + +# If we get a message, send it back to the user (as indicated in the +# ReplyTo property) + +while True: + try: + request = queue.get(timeout=100) + respond(session, request.content) + request.complete() + except Empty: + print "No more messages!" + break; + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# +# Close Channel 1. + +session.session_close() + |