diff options
Diffstat (limited to 'RC9/qpid/python/examples/pubsub')
-rwxr-xr-x | RC9/qpid/python/examples/pubsub/topic_publisher.py | 92 | ||||
-rwxr-xr-x | RC9/qpid/python/examples/pubsub/topic_subscriber.py | 154 | ||||
-rw-r--r-- | RC9/qpid/python/examples/pubsub/verify | 23 | ||||
-rw-r--r-- | RC9/qpid/python/examples/pubsub/verify.in | 55 |
4 files changed, 324 insertions, 0 deletions
diff --git a/RC9/qpid/python/examples/pubsub/topic_publisher.py b/RC9/qpid/python/examples/pubsub/topic_publisher.py new file mode 100755 index 0000000000..b50d5fa8ca --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/topic_publisher.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + topic_publisher.py + + This is a simple AMQP publisher application that uses a + Topic exchange. The publisher specifies the routing key + and the exchange for each message. +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ---------------------------------------- + +def send_msg(routing_key): + props = session.delivery_properties(routing_key=routing_key) + for i in range(5): + session.message_transfer(destination="amq.topic", message=Message(props,routing_key + " " + str(i))) + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket) +connection.start() +session = connection.session(str(uuid4())) + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. Use the +# topic exchange. The routing keys are "usa.news", "usa.weather", +# "europe.news", and "europe.weather". + +# usa.news +send_msg("usa.news") + +# usa.weather +send_msg("usa.weather") + +# europe.news +send_msg("europe.news") + +# europe.weather +send_msg("europe.weather") + +# Signal termination +props = session.delivery_properties(routing_key="control") +session.message_transfer(destination="amq.topic", message=Message(props,"That's all, folks!")) + + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/pubsub/topic_subscriber.py b/RC9/qpid/python/examples/pubsub/topic_subscriber.py new file mode 100755 index 0000000000..489c7cbb19 --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/topic_subscriber.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + topic_subscriber.py + + This subscriber creates private queues and binds them + to the topics 'usa.#', 'europe.#', '#.news', and '#.weather'. +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ------------------------------------------- + +def dump_queue(queue): + + content = "" # Content of the last message read + final = "That's all, folks!" # In a message body, signals the last message + message = 0 + + while content != final: + try: + message = queue.get(timeout=10) + content = message.body + session.message_accept(RangedSet(message.id)) + print content + except Empty: + print "No more messages!" + return + + + +def subscribe_queue(server_queue_name, local_queue_name): + + print "Subscribing local queue '" + local_queue_name + "' to " + server_queue_name + "'" + + queue = session.incoming(local_queue_name) + + session.message_subscribe(queue=server_queue_name, destination=local_queue_name) + queue.start() + + return queue + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Main Body -- ---------------------------------------- + +# declare queues on the server + +news = "news-" + session.name +weather = "weather-" + session.name +usa = "usa-" + session.name +europe = "europe-" + session.name + +session.queue_declare(queue=news, exclusive=True) +session.queue_declare(queue=weather, exclusive=True) +session.queue_declare(queue=usa, exclusive=True) +session.queue_declare(queue=europe, exclusive=True) + +# Routing keys may be "usa.news", "usa.weather", "europe.news", or "europe.weather". + +# The '#' symbol matches one component of a multipart name, e.g. "#.news" matches +# "europe.news" or "usa.news". + +session.exchange_bind(exchange="amq.topic", queue=news, binding_key="#.news") +session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="#.weather") +session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="usa.#") +session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="europe.#") + +# Bind each queue to the control queue so we know when to stop + +session.exchange_bind(exchange="amq.topic", queue=news, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="control") + +# Remind the user to start the topic producer + +print "Queues created - please start the topic producer" +sys.stdout.flush() + +# Subscribe local queues to server queues + +local_news = "local_news" +local_weather = "local_weather" +local_usa = "local_usa" +local_europe = "local_europe" + +local_news_queue = subscribe_queue(news, local_news) +local_weather_queue = subscribe_queue(weather, local_weather) +local_usa_queue = subscribe_queue(usa, local_usa) +local_europe_queue = subscribe_queue(europe, local_europe) + +# Call dump_queue to print messages from each queue + +print "Messages on 'news' queue:" +dump_queue(local_news_queue) + +print "Messages on 'weather' queue:" +dump_queue(local_weather_queue) + +print "Messages on 'usa' queue:" +dump_queue(local_usa_queue) + +print "Messages on 'europe' queue:" +dump_queue(local_europe_queue) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/pubsub/verify b/RC9/qpid/python/examples/pubsub/verify new file mode 100644 index 0000000000..cf1bade62e --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/verify @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Queues created" ./topic_subscriber.py +clients ./topic_publisher.py +outputs ./topic_publisher.py.out "topic_subscriber.py.out | remove_uuid | sort" diff --git a/RC9/qpid/python/examples/pubsub/verify.in b/RC9/qpid/python/examples/pubsub/verify.in new file mode 100644 index 0000000000..1b74acd832 --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/verify.in @@ -0,0 +1,55 @@ +==== topic_publisher.py.out +==== topic_subscriber.py.out | remove_uuid | sort +europe.news 0 +europe.news 0 +europe.news 1 +europe.news 1 +europe.news 2 +europe.news 2 +europe.news 3 +europe.news 3 +europe.news 4 +europe.news 4 +europe.weather 0 +europe.weather 0 +europe.weather 1 +europe.weather 1 +europe.weather 2 +europe.weather 2 +europe.weather 3 +europe.weather 3 +europe.weather 4 +europe.weather 4 +Messages on 'europe' queue: +Messages on 'news' queue: +Messages on 'usa' queue: +Messages on 'weather' queue: +Queues created - please start the topic producer +Subscribing local queue 'local_europe' to europe-' +Subscribing local queue 'local_news' to news-' +Subscribing local queue 'local_usa' to usa-' +Subscribing local queue 'local_weather' to weather-' +That's all, folks! +That's all, folks! +That's all, folks! +That's all, folks! +usa.news 0 +usa.news 0 +usa.news 1 +usa.news 1 +usa.news 2 +usa.news 2 +usa.news 3 +usa.news 3 +usa.news 4 +usa.news 4 +usa.weather 0 +usa.weather 0 +usa.weather 1 +usa.weather 1 +usa.weather 2 +usa.weather 2 +usa.weather 3 +usa.weather 3 +usa.weather 4 +usa.weather 4 |