From 46989a4ce0c90b73dbcb76cb344c95b0de27d360 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 5 Dec 2007 16:22:50 +0000 Subject: Renamed for consistency with c++ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@601394 13f79535-47bb-0310-9956-ffa450edef68 --- .../examples/direct/config_direct_exchange.py | 53 --------- .../python/examples/direct/direct_config_queues.py | 53 +++++++++ qpid/python/examples/direct/direct_consumer.py | 75 ------------- qpid/python/examples/direct/direct_producer.py | 51 --------- qpid/python/examples/direct/direct_publisher.py | 51 +++++++++ qpid/python/examples/direct/listener.py | 75 +++++++++++++ .../examples/fanout/config_fanout_exchange.py | 54 ---------- .../python/examples/fanout/fanout_config_queues.py | 54 ++++++++++ qpid/python/examples/fanout/fanout_consumer.py | 75 ------------- qpid/python/examples/fanout/fanout_producer.py | 48 --------- qpid/python/examples/fanout/fanout_publisher.py | 48 +++++++++ qpid/python/examples/fanout/listener.py | 75 +++++++++++++ qpid/python/examples/pubsub/topic_consumer.py | 118 --------------------- qpid/python/examples/pubsub/topic_listener.py | 118 +++++++++++++++++++++ qpid/python/examples/pubsub/topic_producer.py | 96 ----------------- qpid/python/examples/pubsub/topic_publisher.py | 96 +++++++++++++++++ 16 files changed, 570 insertions(+), 570 deletions(-) delete mode 100644 qpid/python/examples/direct/config_direct_exchange.py create mode 100644 qpid/python/examples/direct/direct_config_queues.py delete mode 100644 qpid/python/examples/direct/direct_consumer.py delete mode 100644 qpid/python/examples/direct/direct_producer.py create mode 100644 qpid/python/examples/direct/direct_publisher.py create mode 100644 qpid/python/examples/direct/listener.py delete mode 100644 qpid/python/examples/fanout/config_fanout_exchange.py create mode 100644 qpid/python/examples/fanout/fanout_config_queues.py delete mode 100644 qpid/python/examples/fanout/fanout_consumer.py delete mode 100644 qpid/python/examples/fanout/fanout_producer.py create mode 100644 qpid/python/examples/fanout/fanout_publisher.py create mode 100644 qpid/python/examples/fanout/listener.py delete mode 100644 qpid/python/examples/pubsub/topic_consumer.py create mode 100644 qpid/python/examples/pubsub/topic_listener.py delete mode 100644 qpid/python/examples/pubsub/topic_producer.py create mode 100644 qpid/python/examples/pubsub/topic_publisher.py (limited to 'qpid/python/examples') diff --git a/qpid/python/examples/direct/config_direct_exchange.py b/qpid/python/examples/direct/config_direct_exchange.py deleted file mode 100644 index e64ad678b8..0000000000 --- a/qpid/python/examples/direct/config_direct_exchange.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -""" - config_direct_exchange.py - - Creates and binds a queue on an AMQP direct exchange. - - All messages using the routing key "routing_key" are - sent to the queue named "message_queue". -""" - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Initialization ----------------------------------- - -# Set parameters for login - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Create a queue ------------------------------------- - -# Create a queue named "listener" on channel 1, and bind it -# to the "amq.direct" exchange. -# -# queue_declare() creates an AMQP queue, which is held -# on the broker. Published messages are sent to the AMQP queue, -# from which messages are delivered to consumers. -# -# queue_bind() determines which messages are routed to a queue. -# Route all messages with the routing key "routing_key" to -# the AMQP queue named "message_queue". - -session.queue_declare(queue="message_queue") -session.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="routing_key") - -#----- Cleanup --------------------------------------------- - -session.session_close() - diff --git a/qpid/python/examples/direct/direct_config_queues.py b/qpid/python/examples/direct/direct_config_queues.py new file mode 100644 index 0000000000..e64ad678b8 --- /dev/null +++ b/qpid/python/examples/direct/direct_config_queues.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +""" + config_direct_exchange.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Create a queue ------------------------------------- + +# Create a queue named "listener" on channel 1, and bind it +# to the "amq.direct" exchange. +# +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# queue_bind() determines which messages are routed to a queue. +# Route all messages with the routing key "routing_key" to +# the AMQP queue named "message_queue". + +session.queue_declare(queue="message_queue") +session.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="routing_key") + +#----- Cleanup --------------------------------------------- + +session.session_close() + diff --git a/qpid/python/examples/direct/direct_consumer.py b/qpid/python/examples/direct/direct_consumer.py deleted file mode 100644 index 38b1ba30a0..0000000000 --- a/qpid/python/examples/direct/direct_consumer.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python -""" - direct_consumer.py - - This AMQP client reads messages from a message - queue named "message_queue". -""" - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - - -#----- Initialization -------------------------------------- - -# Set parameters for login - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Read from queue -------------------------------------------- - -# Now let's create a local client queue and tell it to read -# incoming messages. - -# The consumer tag identifies the client-side queue. - -consumer_tag = "consumer1" -queue = client.queue(consumer_tag) - -# Call message_consume() to tell the broker to deliver messages -# from the AMQP queue to this local client queue. The broker will -# start delivering messages as soon as message_consume() is called. - -session.message_subscribe(queue="message_queue", destination=consumer_tag) -session.message_flow(consumer_tag, 0, 0xFFFFFFFF) # Kill these? -session.message_flow(consumer_tag, 1, 0xFFFFFFFF) # Kill these? - -# Initialize 'final' and 'content', variables used to identify the last message. - -final = "That's all, folks!" # In a message body, signals the last message -content = "" # Content of the last message read - -message = None -while content != final: - message = queue.get(timeout=10) - content = message.content.body - print content - -# Messages are not removed from the queue until they are -# acknowledged. Using cumulative=True, all messages from the session -# up to and including the one identified by the delivery tag are -# acknowledged. This is more efficient, because there are fewer -# network round-trips. - -message.complete(cumulative=True) - -#----- Cleanup ------------------------------------------------ - -# Clean up before exiting so there are no open threads. -# - -session.session_close() diff --git a/qpid/python/examples/direct/direct_producer.py b/qpid/python/examples/direct/direct_producer.py deleted file mode 100644 index 6770e56803..0000000000 --- a/qpid/python/examples/direct/direct_producer.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python -""" - direct_producer.py - - Publishes messages to an AMQP direct exchange, using - the routing key "routing_key" -""" - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Initialization ----------------------------------- - -# Set parameters for login - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Publish some messages ------------------------------ - -# Create some messages and put them on the broker. - -for i in range(10): - message = Content("message " + str(i)) - message["routing_key"] = "routing_key" - session.message_transfer(destination="amq.direct", content=message) - -final="That's all, folks!" -message = Content(final) -message["routing_key"] = "routing_key" -session.message_transfer(destination="amq.direct", content=message) - -#----- Cleanup -------------------------------------------- - -# Clean up before exiting so there are no open threads. - -session.session_close() - diff --git a/qpid/python/examples/direct/direct_publisher.py b/qpid/python/examples/direct/direct_publisher.py new file mode 100644 index 0000000000..6770e56803 --- /dev/null +++ b/qpid/python/examples/direct/direct_publisher.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +""" + direct_producer.py + + Publishes messages to an AMQP direct exchange, using + the routing key "routing_key" +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +for i in range(10): + message = Content("message " + str(i)) + message["routing_key"] = "routing_key" + session.message_transfer(destination="amq.direct", content=message) + +final="That's all, folks!" +message = Content(final) +message["routing_key"] = "routing_key" +session.message_transfer(destination="amq.direct", content=message) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.session_close() + diff --git a/qpid/python/examples/direct/listener.py b/qpid/python/examples/direct/listener.py new file mode 100644 index 0000000000..38b1ba30a0 --- /dev/null +++ b/qpid/python/examples/direct/listener.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +consumer_tag = "consumer1" +queue = client.queue(consumer_tag) + +# Call message_consume() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_consume() is called. + +session.message_subscribe(queue="message_queue", destination=consumer_tag) +session.message_flow(consumer_tag, 0, 0xFFFFFFFF) # Kill these? +session.message_flow(consumer_tag, 1, 0xFFFFFFFF) # Kill these? + +# Initialize 'final' and 'content', variables used to identify the last message. + +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +message = None +while content != final: + message = queue.get(timeout=10) + content = message.content.body + print content + +# Messages are not removed from the queue until they are +# acknowledged. Using cumulative=True, all messages from the session +# up to and including the one identified by the delivery tag are +# acknowledged. This is more efficient, because there are fewer +# network round-trips. + +message.complete(cumulative=True) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.session_close() diff --git a/qpid/python/examples/fanout/config_fanout_exchange.py b/qpid/python/examples/fanout/config_fanout_exchange.py deleted file mode 100644 index 3315f5bc14..0000000000 --- a/qpid/python/examples/fanout/config_fanout_exchange.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -""" - config_direct_exchange.py - - Creates and binds a queue on an AMQP direct exchange. - - All messages using the routing key "routing_key" are - sent to the queue named "message_queue". -""" - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Initialization ----------------------------------- - -# Set parameters for login - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Create a queue ------------------------------------- - -# Create a queue named "listener" on channel 1, and bind it -# to the "amq.fanout" exchange. -# -# queue_declare() creates an AMQP queue, which is held -# on the broker. Published messages are sent to the AMQP queue, -# from which messages are delivered to consumers. -# -# queue_bind() determines which messages are routed to a queue. -# Route all messages with the routing key "routing_key" to -# the AMQP queue named "message_queue". - -session.queue_declare(queue="message_queue") -session.queue_bind(exchange="amq.fanout", queue="message_queue") - -#----- Cleanup --------------------------------------------- - -# Clean up before exiting so there are no open threads. - -session.session_close() diff --git a/qpid/python/examples/fanout/fanout_config_queues.py b/qpid/python/examples/fanout/fanout_config_queues.py new file mode 100644 index 0000000000..3315f5bc14 --- /dev/null +++ b/qpid/python/examples/fanout/fanout_config_queues.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +""" + config_direct_exchange.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Create a queue ------------------------------------- + +# Create a queue named "listener" on channel 1, and bind it +# to the "amq.fanout" exchange. +# +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# queue_bind() determines which messages are routed to a queue. +# Route all messages with the routing key "routing_key" to +# the AMQP queue named "message_queue". + +session.queue_declare(queue="message_queue") +session.queue_bind(exchange="amq.fanout", queue="message_queue") + +#----- Cleanup --------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.session_close() diff --git a/qpid/python/examples/fanout/fanout_consumer.py b/qpid/python/examples/fanout/fanout_consumer.py deleted file mode 100644 index e9a2291f4c..0000000000 --- a/qpid/python/examples/fanout/fanout_consumer.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python -""" - direct_consumer.py - - This AMQP client reads messages from a message - queue named "message_queue". -""" - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - - -#----- Initialization -------------------------------------- - -# Set parameters for login - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Read from queue -------------------------------------------- - -# Now let's create a local client queue and tell it to read -# incoming messages. - -# The consumer tag identifies the client-side queue. - -consumer_tag = "consumer1" -queue = client.queue(consumer_tag) - -# Call message_consume() to tell the broker to deliver messages -# from the AMQP queue to this local client queue. The broker will -# start delivering messages as soon as message_consume() is called. - -session.message_subscribe(queue="message_queue", destination=consumer_tag) -session.message_flow(consumer_tag, 0, 0xFFFFFFFF) -session.message_flow(consumer_tag, 1, 0xFFFFFFFF) - -# Initialize 'final' and 'content', variables used to identify the last message. - -final = "That's all, folks!" # In a message body, signals the last message -content = "" # Content of the last message read - -message = None -while content != final: - message = queue.get(timeout=10) - content = message.content.body - print content - -# Messages are not removed from the queue until they are -# acknowledged. Using cumulative=True, all messages from the session -# up to and including the one identified by the delivery tag are -# acknowledged. This is more efficient, because there are fewer -# network round-trips. - -message.complete(cumulative=True) - -#----- Cleanup ------------------------------------------------ - -# Clean up before exiting so there are no open threads. -# - -session.session_close() diff --git a/qpid/python/examples/fanout/fanout_producer.py b/qpid/python/examples/fanout/fanout_producer.py deleted file mode 100644 index 92ca7b7ec0..0000000000 --- a/qpid/python/examples/fanout/fanout_producer.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python -""" - direct_producer.py - - Publishes messages to an AMQP direct exchange, using - the routing key "routing_key" -""" - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Initialization ----------------------------------- - -# Set parameters for login - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -client = Client(host, port, qpid.spec.load(amqp_spec)) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Publish some messages ------------------------------ - -# Create some messages and put them on the broker. - -for i in range(10): - message = Content(body="message " + str(i)) - session.message_transfer(destination="amq.fanout", content=message) - -final="That's all, folks!" -message=Content(final) -session.message_transfer(destination="amq.fanout", content=message) - -#----- Cleanup -------------------------------------------- - -# Clean up before exiting so there are no open threads. - -session.session_close() diff --git a/qpid/python/examples/fanout/fanout_publisher.py b/qpid/python/examples/fanout/fanout_publisher.py new file mode 100644 index 0000000000..92ca7b7ec0 --- /dev/null +++ b/qpid/python/examples/fanout/fanout_publisher.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +""" + direct_producer.py + + Publishes messages to an AMQP direct exchange, using + the routing key "routing_key" +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +for i in range(10): + message = Content(body="message " + str(i)) + session.message_transfer(destination="amq.fanout", content=message) + +final="That's all, folks!" +message=Content(final) +session.message_transfer(destination="amq.fanout", content=message) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.session_close() diff --git a/qpid/python/examples/fanout/listener.py b/qpid/python/examples/fanout/listener.py new file mode 100644 index 0000000000..e9a2291f4c --- /dev/null +++ b/qpid/python/examples/fanout/listener.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +client = Client(host, port, qpid.spec.load(amqp_spec)) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +consumer_tag = "consumer1" +queue = client.queue(consumer_tag) + +# Call message_consume() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_consume() is called. + +session.message_subscribe(queue="message_queue", destination=consumer_tag) +session.message_flow(consumer_tag, 0, 0xFFFFFFFF) +session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + +# Initialize 'final' and 'content', variables used to identify the last message. + +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +message = None +while content != final: + message = queue.get(timeout=10) + content = message.content.body + print content + +# Messages are not removed from the queue until they are +# acknowledged. Using cumulative=True, all messages from the session +# up to and including the one identified by the delivery tag are +# acknowledged. This is more efficient, because there are fewer +# network round-trips. + +message.complete(cumulative=True) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.session_close() diff --git a/qpid/python/examples/pubsub/topic_consumer.py b/qpid/python/examples/pubsub/topic_consumer.py deleted file mode 100644 index afe8bba91e..0000000000 --- a/qpid/python/examples/pubsub/topic_consumer.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python -""" - topic_consumer.py - - This AMQP client reads all messages from the - "news", "weather", "usa", and "europe" queues - created and bound by config_topic_exchange.py. -""" - -import base64 - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Functions ------------------------------------------- - -def dump_queue(client, queue_name): - - print "Messages queue: " + queue_name - - consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag - queue = client.queue(consumer_tag) - - # Call basic_consume() to tell the broker to deliver messages - # from the AMQP queue to a local client queue. The broker will - # start delivering messages as soon as basic_consume() is called. - - session.message_subscribe(queue=queue_name, destination=consumer_tag) - session.message_flow(consumer_tag, 0, 0xFFFFFFFF) - session.message_flow(consumer_tag, 1, 0xFFFFFFFF) - - content = "" # Content of the last message read - final = "That's all, folks!" # In a message body, signals the last message - message = 0 - - while content != final: - try: - message = queue.get() - content = message.content.body - print content - except Empty: - if message != 0: - message.complete(cumulative=True) - print "No more messages!" - return - - - # Messages are not removed from the queue until they - # are acknowledged. Using multiple=True, all messages - # in the channel up to and including the one identified - # by the delivery tag are acknowledged. This is more efficient, - # because there are fewer network round-trips. - - if message != 0: - message.complete(cumulative=True) - - -#----- Initialization -------------------------------------- - -# Set parameters for login - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -spec = qpid.spec.load(amqp_spec) -client = Client(host, port, spec) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session_info = session.session_open() -session_id = session_info.session_id - -#----- Main Body -- ---------------------------------------- - - -news = "news" + base64.urlsafe_b64encode(session_id) -weather = "weather" + base64.urlsafe_b64encode(session_id) -usa = "usa" + base64.urlsafe_b64encode(session_id) -europe = "europe" + base64.urlsafe_b64encode(session_id) - -session.queue_declare(queue=news, exclusive=True) -session.queue_declare(queue=weather, exclusive=True) -session.queue_declare(queue=usa, exclusive=True) -session.queue_declare(queue=europe, exclusive=True) - -# Routing keys may be "usa.news", "usa.weather", "europe.news", or "europe.weather". - -# The '#' symbol matches one component of a multipart name, e.g. "#.news" matches -# "europe.news" or "usa.news". - -session.queue_bind(exchange="amq.topic", queue=news, routing_key="#.news") -session.queue_bind(exchange="amq.topic", queue=weather, routing_key="#.weather") -session.queue_bind(exchange="amq.topic", queue=usa, routing_key="usa.#") -session.queue_bind(exchange="amq.topic", queue=europe, routing_key="europe.#") - -# Remind the user to start the topic producer - -print "Queues create - please start the topic producer" - -# Call dump_queue to print messages from each queue - -dump_queue(client, news) -dump_queue(client, weather) -dump_queue(client, usa) -dump_queue(client, europe) - -#----- Cleanup ------------------------------------------------ - -# Clean up before exiting so there are no open threads. - -session.session_close() diff --git a/qpid/python/examples/pubsub/topic_listener.py b/qpid/python/examples/pubsub/topic_listener.py new file mode 100644 index 0000000000..afe8bba91e --- /dev/null +++ b/qpid/python/examples/pubsub/topic_listener.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +""" + topic_consumer.py + + This AMQP client reads all messages from the + "news", "weather", "usa", and "europe" queues + created and bound by config_topic_exchange.py. +""" + +import base64 + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Functions ------------------------------------------- + +def dump_queue(client, queue_name): + + print "Messages queue: " + queue_name + + consumer_tag = queue_name # Use the queue name as the consumer tag - need a unique tag + queue = client.queue(consumer_tag) + + # Call basic_consume() to tell the broker to deliver messages + # from the AMQP queue to a local client queue. The broker will + # start delivering messages as soon as basic_consume() is called. + + session.message_subscribe(queue=queue_name, destination=consumer_tag) + session.message_flow(consumer_tag, 0, 0xFFFFFFFF) + session.message_flow(consumer_tag, 1, 0xFFFFFFFF) + + content = "" # Content of the last message read + final = "That's all, folks!" # In a message body, signals the last message + message = 0 + + while content != final: + try: + message = queue.get() + content = message.content.body + print content + except Empty: + if message != 0: + message.complete(cumulative=True) + print "No more messages!" + return + + + # Messages are not removed from the queue until they + # are acknowledged. Using multiple=True, all messages + # in the channel up to and including the one identified + # by the delivery tag are acknowledged. This is more efficient, + # because there are fewer network round-trips. + + if message != 0: + message.complete(cumulative=True) + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +spec = qpid.spec.load(amqp_spec) +client = Client(host, port, spec) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session_info = session.session_open() +session_id = session_info.session_id + +#----- Main Body -- ---------------------------------------- + + +news = "news" + base64.urlsafe_b64encode(session_id) +weather = "weather" + base64.urlsafe_b64encode(session_id) +usa = "usa" + base64.urlsafe_b64encode(session_id) +europe = "europe" + base64.urlsafe_b64encode(session_id) + +session.queue_declare(queue=news, exclusive=True) +session.queue_declare(queue=weather, exclusive=True) +session.queue_declare(queue=usa, exclusive=True) +session.queue_declare(queue=europe, exclusive=True) + +# Routing keys may be "usa.news", "usa.weather", "europe.news", or "europe.weather". + +# The '#' symbol matches one component of a multipart name, e.g. "#.news" matches +# "europe.news" or "usa.news". + +session.queue_bind(exchange="amq.topic", queue=news, routing_key="#.news") +session.queue_bind(exchange="amq.topic", queue=weather, routing_key="#.weather") +session.queue_bind(exchange="amq.topic", queue=usa, routing_key="usa.#") +session.queue_bind(exchange="amq.topic", queue=europe, routing_key="europe.#") + +# Remind the user to start the topic producer + +print "Queues create - please start the topic producer" + +# Call dump_queue to print messages from each queue + +dump_queue(client, news) +dump_queue(client, weather) +dump_queue(client, usa) +dump_queue(client, europe) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.session_close() diff --git a/qpid/python/examples/pubsub/topic_producer.py b/qpid/python/examples/pubsub/topic_producer.py deleted file mode 100644 index c3b13cd82c..0000000000 --- a/qpid/python/examples/pubsub/topic_producer.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python -""" - topic_producer.py - - This is a simple AMQP publisher application that uses a - Topic exchange. The publisher specifies the routing key - and the exchange for each message. -""" - -import qpid -from qpid.client import Client -from qpid.content import Content -from qpid.queue import Empty - -#----- Initialization ----------------------------------- - -# Set parameters for login. - -host="127.0.0.1" -port=5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" -user="guest" -password="guest" - -# Create a client and log in to it. - -spec = qpid.spec.load(amqp_spec) -client = Client(host, port, spec) -client.start({"LOGIN": user, "PASSWORD": password}) - -session = client.session() -session.session_open() - -#----- Publish some messages ------------------------------ - -# Create some messages and put them on the broker. Use the -# topic exchange. The routing keys are "usa.news", "usa.weather", -# "europe.news", and "europe.weather". - -final = "That's all, folks!" - -# We'll use the same routing key for all messages in the loop, and -# also for the terminating message. - -# usa.news - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "usa.news" - session.message_transfer(destination="amq.topic", content=message) - -message = Content(final) -message["routing_key"] = "usa.news" -session.message_transfer(destination="amq.topic", content=message) - -# usa.weather - - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "usa.weather" - session.message_transfer(destination="amq.topic", content=message) - -message = Content(final) -message["routing_key"] = "usa.weather" -session.message_transfer(destination="amq.topic", content=message) - -# europe.news - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "europe.news" - session.message_transfer(destination="amq.topic", content=message) - -message = Content(final) -message["routing_key"] = "europe.news" -session.message_transfer(destination="amq.topic", content=message) - - -# europe.weather - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "europe.weather" - session.message_transfer(destination="amq.topic", content=message) - -message = Content(final) -message["routing_key"] = "europe.weather" -session.message_transfer(destination="amq.topic", content=message) - - -#----- Cleanup -------------------------------------------- - -# Clean up before exiting so there are no open threads. - -session.session_close() diff --git a/qpid/python/examples/pubsub/topic_publisher.py b/qpid/python/examples/pubsub/topic_publisher.py new file mode 100644 index 0000000000..c3b13cd82c --- /dev/null +++ b/qpid/python/examples/pubsub/topic_publisher.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +""" + topic_producer.py + + This is a simple AMQP publisher application that uses a + Topic exchange. The publisher specifies the routing key + and the exchange for each message. +""" + +import qpid +from qpid.client import Client +from qpid.content import Content +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login. + +host="127.0.0.1" +port=5672 +amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" +user="guest" +password="guest" + +# Create a client and log in to it. + +spec = qpid.spec.load(amqp_spec) +client = Client(host, port, spec) +client.start({"LOGIN": user, "PASSWORD": password}) + +session = client.session() +session.session_open() + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. Use the +# topic exchange. The routing keys are "usa.news", "usa.weather", +# "europe.news", and "europe.weather". + +final = "That's all, folks!" + +# We'll use the same routing key for all messages in the loop, and +# also for the terminating message. + +# usa.news + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "usa.news" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "usa.news" +session.message_transfer(destination="amq.topic", content=message) + +# usa.weather + + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "usa.weather" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "usa.weather" +session.message_transfer(destination="amq.topic", content=message) + +# europe.news + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "europe.news" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "europe.news" +session.message_transfer(destination="amq.topic", content=message) + + +# europe.weather + +for i in range(5): + message = Content("message " + str(i)) + message["routing_key"] = "europe.weather" + session.message_transfer(destination="amq.topic", content=message) + +message = Content(final) +message["routing_key"] = "europe.weather" +session.message_transfer(destination="amq.topic", content=message) + + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.session_close() -- cgit v1.2.1