diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
commit | 7e34266b9a23f4536415bfbc3f161b84615b6550 (patch) | |
tree | 484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/python/examples | |
parent | 4612263ea692f00a4bd810438bdaf9bc88022091 (diff) | |
download | qpid-python-M4.tar.gz |
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/python/examples')
25 files changed, 1750 insertions, 0 deletions
diff --git a/RC9/qpid/python/examples/direct/declare_queues.py b/RC9/qpid/python/examples/direct/declare_queues.py new file mode 100755 index 0000000000..13818ee9d7 --- /dev/null +++ b/RC9/qpid/python/examples/direct/declare_queues.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + declare_queues.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +# Common includes + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Create a queue ------------------------------------- + +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# exchange_bind() determines which messages are routed to a queue. +# Route all messages with the binding key "routing_key" to +# the AMQP queue named "message_queue". + +session.queue_declare(queue="message_queue") +session.exchange_bind(exchange="amq.direct", queue="message_queue", binding_key="routing_key") + +#----- Cleanup --------------------------------------------- + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/direct/direct_consumer.py b/RC9/qpid/python/examples/direct/direct_consumer.py new file mode 100755 index 0000000000..b07e53c5c7 --- /dev/null +++ b/RC9/qpid/python/examples/direct/direct_consumer.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +import sys +import os +from random import randint +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +local_queue_name = "local_queue" +queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as credit is allocated using +# queue.start(). + +session.message_subscribe(queue="message_queue", destination=local_queue_name) +queue.start() + +# Initialize 'final' and 'content', variables used to identify the last message. + +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +message = None +while content != final: + message = queue.get(timeout=10) + content = message.body + session.message_accept(RangedSet(message.id)) + print content + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/direct/direct_producer.py b/RC9/qpid/python/examples/direct/direct_producer.py new file mode 100755 index 0000000000..fcbb4675e4 --- /dev/null +++ b/RC9/qpid/python/examples/direct/direct_producer.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + direct_producer.py + + Publishes messages to an AMQP direct exchange, using + the routing key "routing_key" +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message +from qpid.datatypes import uuid4 +from qpid.queue import Empty + + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. +props = session.delivery_properties(routing_key="routing_key") + +for i in range(10): + session.message_transfer(destination="amq.direct", message=Message(props,"message " + str(i))) + +session.message_transfer(destination="amq.direct", message=Message(props,"That's all, folks!")) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/direct/listener.py b/RC9/qpid/python/examples/direct/listener.py new file mode 100755 index 0000000000..9d06bd3929 --- /dev/null +++ b/RC9/qpid/python/examples/direct/listener.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + listener.py + + This AMQP client reads messages from a message + queue named "message_queue". It is implemented + as a message listener. +""" + +# Common includes + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +# Includes specific to this example + +from time import sleep + + +#----- Message Receive Handler ----------------------------- +class Receiver: + def __init__ (self): + self.finalReceived = False + + def isFinal (self): + return self.finalReceived + + def Handler (self, message): + content = message.body + session.message_accept(RangedSet(message.id)) + print content + if content == "That's all, folks!": + self.finalReceived = True + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The local_queue_name identifies the client-side queue. + +local_queue_name = "local_queue" +queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as message_subscribe() is called. + +session.message_subscribe(queue="message_queue", destination=local_queue_name) +queue.start() + +receiver = Receiver() +queue.listen (receiver.Handler) + +while not receiver.isFinal() : + sleep (1) + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/direct/verify b/RC9/qpid/python/examples/direct/verify new file mode 100644 index 0000000000..92f87bf827 --- /dev/null +++ b/RC9/qpid/python/examples/direct/verify @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +clients ./declare_queues.py ./direct_producer.py ./direct_consumer.py +outputs ./declare_queues.py.out ./direct_producer.py.out ./direct_consumer.py.out diff --git a/RC9/qpid/python/examples/direct/verify.in b/RC9/qpid/python/examples/direct/verify.in new file mode 100644 index 0000000000..5e691619d9 --- /dev/null +++ b/RC9/qpid/python/examples/direct/verify.in @@ -0,0 +1,14 @@ +==== declare_queues.py.out +==== direct_producer.py.out +==== direct_consumer.py.out +message 0 +message 1 +message 2 +message 3 +message 4 +message 5 +message 6 +message 7 +message 8 +message 9 +That's all, folks! diff --git a/RC9/qpid/python/examples/fanout/fanout_consumer.py b/RC9/qpid/python/examples/fanout/fanout_consumer.py new file mode 100755 index 0000000000..0452baa8da --- /dev/null +++ b/RC9/qpid/python/examples/fanout/fanout_consumer.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + fanout_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Initialization -------------------------------------- + + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + + +#----- Main Body ------------------------------------------- + +# Create a server-side queue and route messages to it. +# The server-side queue must have a unique name. Use the +# session id for that. +server_queue_name = session.name +session.queue_declare(queue=server_queue_name) +session.exchange_bind(queue=server_queue_name, exchange="amq.fanout") + +# Create a local queue to receive messages from the server-side +# queue. +local_queue_name = "local_queue" +local_queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the server to deliver messages +# from the AMQP queue to this local client queue. + +session.message_subscribe(queue=server_queue_name, destination=local_queue_name) +local_queue.start() + +print "Subscribed to queue " + server_queue_name +sys.stdout.flush() + +# Initialize 'final' and 'content', variables used to identify the last message. +final = "That's all, folks!" # In a message body, signals the last message +content = "" # Content of the last message read + +# Read the messages - acknowledge each one +message = None +while content != final: + message = local_queue.get(timeout=10) + content = message.body + session.message_accept(RangedSet(message.id)) + print content + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/fanout/fanout_producer.py b/RC9/qpid/python/examples/fanout/fanout_producer.py new file mode 100755 index 0000000000..c4df252c70 --- /dev/null +++ b/RC9/qpid/python/examples/fanout/fanout_producer.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + fanout_producer.py + + Publishes messages to an AMQP direct exchange, using + the routing key "routing_key" +""" +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, uuid4 +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +delivery_properties = session.delivery_properties(routing_key="routing_key") + +for i in range(10): + session.message_transfer(destination="amq.fanout", message=Message(delivery_properties,"message " + str(i))) + +session.message_transfer(destination="amq.fanout", message=Message(delivery_properties, "That's all, folks!")) + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/fanout/listener.py b/RC9/qpid/python/examples/fanout/listener.py new file mode 100755 index 0000000000..29db402e9d --- /dev/null +++ b/RC9/qpid/python/examples/fanout/listener.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + listener.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +# + +from time import sleep + +#----- Message Receive Handler ----------------------------- +class Receiver: + def __init__ (self): + self.finalReceived = False + + def isFinal (self): + return self.finalReceived + + def Handler (self, message): + content = message.body + session.message_accept(RangedSet(message.id)) + print content + if content == "That's all, folks!": + self.finalReceived = True + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Read from queue -------------------------------------------- + +# Create a server-side queue and route messages to it. +# The server-side queue must have a unique name. Use the +# session id for that. + +server_queue_name = session.name +session.queue_declare(queue=server_queue_name) +session.exchange_bind(queue=server_queue_name, exchange="amq.fanout") + +# Create a local queue to receive messages from the server-side +# queue. +local_queue_name = "local_queue" +local_queue = session.incoming(local_queue_name) + + +# The local queue name identifies the client-side queue. + +local_queue_name = "local_queue" +local_queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as local_queue.start() is called. + +session.message_subscribe(queue=server_queue_name, destination=local_queue_name) +local_queue.start() + +receiver = Receiver () +local_queue.listen (receiver.Handler) + +while not receiver.isFinal (): + sleep (1) + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close() diff --git a/RC9/qpid/python/examples/fanout/verify b/RC9/qpid/python/examples/fanout/verify new file mode 100644 index 0000000000..9e5c364bfa --- /dev/null +++ b/RC9/qpid/python/examples/fanout/verify @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Subscribed" ./fanout_consumer.py +background "Subscribed" ./fanout_consumer.py +clients ./fanout_producer.py +outputs ./fanout_producer.py.out "./fanout_consumer.py.out | remove_uuid" "./fanout_consumer.pyX.out | remove_uuid" diff --git a/RC9/qpid/python/examples/fanout/verify.in b/RC9/qpid/python/examples/fanout/verify.in new file mode 100644 index 0000000000..d4b8670de9 --- /dev/null +++ b/RC9/qpid/python/examples/fanout/verify.in @@ -0,0 +1,27 @@ +==== fanout_producer.py.out +==== fanout_consumer.py.out | remove_uuid +Subscribed to queue +message 0 +message 1 +message 2 +message 3 +message 4 +message 5 +message 6 +message 7 +message 8 +message 9 +That's all, folks! +==== fanout_consumer.pyX.out | remove_uuid +Subscribed to queue +message 0 +message 1 +message 2 +message 3 +message 4 +message 5 +message 6 +message 7 +message 8 +message 9 +That's all, folks! diff --git a/RC9/qpid/python/examples/pubsub/topic_publisher.py b/RC9/qpid/python/examples/pubsub/topic_publisher.py new file mode 100755 index 0000000000..b50d5fa8ca --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/topic_publisher.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + topic_publisher.py + + This is a simple AMQP publisher application that uses a + Topic exchange. The publisher specifies the routing key + and the exchange for each message. +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ---------------------------------------- + +def send_msg(routing_key): + props = session.delivery_properties(routing_key=routing_key) + for i in range(5): + session.message_transfer(destination="amq.topic", message=Message(props,routing_key + " " + str(i))) + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket) +connection.start() +session = connection.session(str(uuid4())) + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. Use the +# topic exchange. The routing keys are "usa.news", "usa.weather", +# "europe.news", and "europe.weather". + +# usa.news +send_msg("usa.news") + +# usa.weather +send_msg("usa.weather") + +# europe.news +send_msg("europe.news") + +# europe.weather +send_msg("europe.weather") + +# Signal termination +props = session.delivery_properties(routing_key="control") +session.message_transfer(destination="amq.topic", message=Message(props,"That's all, folks!")) + + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/pubsub/topic_subscriber.py b/RC9/qpid/python/examples/pubsub/topic_subscriber.py new file mode 100755 index 0000000000..489c7cbb19 --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/topic_subscriber.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + topic_subscriber.py + + This subscriber creates private queues and binds them + to the topics 'usa.#', 'europe.#', '#.news', and '#.weather'. +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ------------------------------------------- + +def dump_queue(queue): + + content = "" # Content of the last message read + final = "That's all, folks!" # In a message body, signals the last message + message = 0 + + while content != final: + try: + message = queue.get(timeout=10) + content = message.body + session.message_accept(RangedSet(message.id)) + print content + except Empty: + print "No more messages!" + return + + + +def subscribe_queue(server_queue_name, local_queue_name): + + print "Subscribing local queue '" + local_queue_name + "' to " + server_queue_name + "'" + + queue = session.incoming(local_queue_name) + + session.message_subscribe(queue=server_queue_name, destination=local_queue_name) + queue.start() + + return queue + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Main Body -- ---------------------------------------- + +# declare queues on the server + +news = "news-" + session.name +weather = "weather-" + session.name +usa = "usa-" + session.name +europe = "europe-" + session.name + +session.queue_declare(queue=news, exclusive=True) +session.queue_declare(queue=weather, exclusive=True) +session.queue_declare(queue=usa, exclusive=True) +session.queue_declare(queue=europe, exclusive=True) + +# Routing keys may be "usa.news", "usa.weather", "europe.news", or "europe.weather". + +# The '#' symbol matches one component of a multipart name, e.g. "#.news" matches +# "europe.news" or "usa.news". + +session.exchange_bind(exchange="amq.topic", queue=news, binding_key="#.news") +session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="#.weather") +session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="usa.#") +session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="europe.#") + +# Bind each queue to the control queue so we know when to stop + +session.exchange_bind(exchange="amq.topic", queue=news, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="control") +session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="control") + +# Remind the user to start the topic producer + +print "Queues created - please start the topic producer" +sys.stdout.flush() + +# Subscribe local queues to server queues + +local_news = "local_news" +local_weather = "local_weather" +local_usa = "local_usa" +local_europe = "local_europe" + +local_news_queue = subscribe_queue(news, local_news) +local_weather_queue = subscribe_queue(weather, local_weather) +local_usa_queue = subscribe_queue(usa, local_usa) +local_europe_queue = subscribe_queue(europe, local_europe) + +# Call dump_queue to print messages from each queue + +print "Messages on 'news' queue:" +dump_queue(local_news_queue) + +print "Messages on 'weather' queue:" +dump_queue(local_weather_queue) + +print "Messages on 'usa' queue:" +dump_queue(local_usa_queue) + +print "Messages on 'europe' queue:" +dump_queue(local_europe_queue) + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/pubsub/verify b/RC9/qpid/python/examples/pubsub/verify new file mode 100644 index 0000000000..cf1bade62e --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/verify @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Queues created" ./topic_subscriber.py +clients ./topic_publisher.py +outputs ./topic_publisher.py.out "topic_subscriber.py.out | remove_uuid | sort" diff --git a/RC9/qpid/python/examples/pubsub/verify.in b/RC9/qpid/python/examples/pubsub/verify.in new file mode 100644 index 0000000000..1b74acd832 --- /dev/null +++ b/RC9/qpid/python/examples/pubsub/verify.in @@ -0,0 +1,55 @@ +==== topic_publisher.py.out +==== topic_subscriber.py.out | remove_uuid | sort +europe.news 0 +europe.news 0 +europe.news 1 +europe.news 1 +europe.news 2 +europe.news 2 +europe.news 3 +europe.news 3 +europe.news 4 +europe.news 4 +europe.weather 0 +europe.weather 0 +europe.weather 1 +europe.weather 1 +europe.weather 2 +europe.weather 2 +europe.weather 3 +europe.weather 3 +europe.weather 4 +europe.weather 4 +Messages on 'europe' queue: +Messages on 'news' queue: +Messages on 'usa' queue: +Messages on 'weather' queue: +Queues created - please start the topic producer +Subscribing local queue 'local_europe' to europe-' +Subscribing local queue 'local_news' to news-' +Subscribing local queue 'local_usa' to usa-' +Subscribing local queue 'local_weather' to weather-' +That's all, folks! +That's all, folks! +That's all, folks! +That's all, folks! +usa.news 0 +usa.news 0 +usa.news 1 +usa.news 1 +usa.news 2 +usa.news 2 +usa.news 3 +usa.news 3 +usa.news 4 +usa.news 4 +usa.weather 0 +usa.weather 0 +usa.weather 1 +usa.weather 1 +usa.weather 2 +usa.weather 2 +usa.weather 3 +usa.weather 3 +usa.weather 4 +usa.weather 4 diff --git a/RC9/qpid/python/examples/request-response/client.py b/RC9/qpid/python/examples/request-response/client.py new file mode 100755 index 0000000000..b29fcf3ea7 --- /dev/null +++ b/RC9/qpid/python/examples/request-response/client.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + client.py + + Client for a client/server example + +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ------------------------------------------- + +def dump_queue(queue_name): + + print "Messages on queue: " + queue_name + + message = 0 + + while True: + try: + message = queue.get(timeout=10) + content = message.body + session.message_accept(RangedSet(message.id)) + print "Response: " + content + except Empty: + print "No more messages!" + break + except: + print "Unexpected exception!" + break + + +#----- Initialization -------------------------------------- + + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + + +#----- Main Body -- ---------------------------------------- + +# Create a response queue for the server to send responses to. Use the +# same string as the name of the queue and the name of the routing +# key. + +reply_to = "reply_to:" + session.name +session.queue_declare(queue=reply_to, exclusive=True) +session.exchange_bind(exchange="amq.direct", queue=reply_to, binding_key=reply_to) + +# Create a local queue and subscribe it to the response queue + +local_queue_name = "local_queue" +queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the broker to deliver messages from +# the server's reply_to queue to our local client queue. The server +# will start delivering messages as soon as message credit is +# available. + +session.message_subscribe(queue=reply_to, destination=local_queue_name) +queue.start() + +# Send some messages to the server's request queue + +lines = ["Twas brillig, and the slithy toves", + "Did gyre and gimble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe."] + +# We will use the same reply_to and routing key +# for each message + +message_properties = session.message_properties() +message_properties.reply_to = session.reply_to("amq.direct", reply_to) +delivery_properties = session.delivery_properties(routing_key="request") + +for line in lines: + print "Request: " + line + session.message_transfer(destination="amq.direct", message=Message(message_properties, delivery_properties, line)) + +# Now see what messages the server sent to our reply_to queue + +dump_queue(reply_to) + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/request-response/server.py b/RC9/qpid/python/examples/request-response/server.py new file mode 100755 index 0000000000..a80c4541e4 --- /dev/null +++ b/RC9/qpid/python/examples/request-response/server.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + server.py + + Server for a client/server example +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ------------------------------------------- +def respond(session, request): + + # The routing key for the response is the request's reply-to + # property. The body for the response is the request's body, + # converted to upper case. + + message_properties = request.get("message_properties") + reply_to = message_properties.reply_to + if reply_to == None: + raise Exception("This message is missing the 'reply_to' property, which is required") + + props = session.delivery_properties(routing_key=reply_to["routing_key"]) + session.message_transfer(destination=reply_to["exchange"], message=Message(props,request.body.upper())) + +#----- Initialization -------------------------------------- + + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Main Body -- ---------------------------------------- + +# Create a request queue and subscribe to it + +session.queue_declare(queue="request", exclusive=True) +session.exchange_bind(exchange="amq.direct", queue="request", binding_key="request") + +local_queue_name = "local_queue" + +session.message_subscribe(queue="request", destination=local_queue_name) + +queue = session.incoming(local_queue_name) +queue.start() + +# Remind the user to start the client program + +print "Request server running - run your client now." +print "(Times out after 100 seconds ...)" +sys.stdout.flush() + +# Respond to each request + +# If we get a message, send it back to the user (as indicated in the +# ReplyTo property) + +while True: + try: + request = queue.get(timeout=100) + respond(session, request) + session.message_accept(RangedSet(request.id)) + except Empty: + print "No more messages!" + break; + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. + +session.close(timeout=10) diff --git a/RC9/qpid/python/examples/request-response/verify b/RC9/qpid/python/examples/request-response/verify new file mode 100644 index 0000000000..3c058febb2 --- /dev/null +++ b/RC9/qpid/python/examples/request-response/verify @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Request server running" ./server.py +clients ./client.py +kill %% # Must kill the server. +outputs "./client.py.out | remove_uuid" " server.py.out | remove_uuid" diff --git a/RC9/qpid/python/examples/request-response/verify.in b/RC9/qpid/python/examples/request-response/verify.in new file mode 100644 index 0000000000..4c31128975 --- /dev/null +++ b/RC9/qpid/python/examples/request-response/verify.in @@ -0,0 +1,14 @@ +==== client.py.out | remove_uuid +Request: Twas brillig, and the slithy toves +Request: Did gyre and gimble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Messages on queue: reply_to: +Response: TWAS BRILLIG, AND THE SLITHY TOVES +Response: DID GYRE AND GIMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +No more messages! +==== server.py.out | remove_uuid +Request server running - run your client now. +(Times out after 100 seconds ...) diff --git a/RC9/qpid/python/examples/xml-exchange/declare_queues.py b/RC9/qpid/python/examples/xml-exchange/declare_queues.py new file mode 100755 index 0000000000..ca40af5dc5 --- /dev/null +++ b/RC9/qpid/python/examples/xml-exchange/declare_queues.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + declare_queues.py + + Creates and binds a queue on an AMQP direct exchange. + + All messages using the routing key "routing_key" are + sent to the queue named "message_queue". +""" + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Initialization ----------------------------------- + + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Create a queue ------------------------------------- + +# queue_declare() creates an AMQP queue, which is held +# on the broker. Published messages are sent to the AMQP queue, +# from which messages are delivered to consumers. +# +# queue_bind() determines which messages are routed to a queue. +# Route all messages with the routing key "routing_key" to +# the AMQP queue named "message_queue". + +session.exchange_declare(exchange="xml", type="xml") +session.queue_declare(queue="message_queue") + +binding = {} +binding["xquery"] = """ + let $w := ./weather + return $w/station = 'Raleigh-Durham International Airport (KRDU)' + and $w/temperature_f > 50 + and $w/temperature_f - $w/dewpoint > 5 + and $w/wind_speed_mph > 7 + and $w/wind_speed_mph < 20 """ + + +session.exchange_bind(exchange="xml", queue="message_queue", binding_key="weather", arguments=binding) + + +#----- Cleanup --------------------------------------------- + +session.close() + + diff --git a/RC9/qpid/python/examples/xml-exchange/listener.py b/RC9/qpid/python/examples/xml-exchange/listener.py new file mode 100755 index 0000000000..a56f5d6018 --- /dev/null +++ b/RC9/qpid/python/examples/xml-exchange/listener.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + listener.py + + This AMQP client reads messages from a message + queue named "message_queue". It is implemented + as a message listener. +""" + + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +# + +from time import sleep + + +#----- Message Receive Handler ----------------------------- +class Receiver: + def __init__ (self): + self.finalReceived = False + + def isFinal (self): + return self.finalReceived + + def Handler (self, message): + content = message.body + session.message_accept(RangedSet(message.id)) + print content + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +local_queue_name = "local_queue" +local_queue = session.incoming(local_queue_name) + +# Call message_subscribe() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as local_queue.start() is called. + +session.message_subscribe(queue="message_queue", destination=local_queue_name) +local_queue.start() + +receiver = Receiver () +local_queue.listen (receiver.Handler) + +sleep (10) + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close() diff --git a/RC9/qpid/python/examples/xml-exchange/verify b/RC9/qpid/python/examples/xml-exchange/verify new file mode 100644 index 0000000000..a93a32dc90 --- /dev/null +++ b/RC9/qpid/python/examples/xml-exchange/verify @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +clients ./declare_queues.py ./xml_producer.py ./xml_consumer.py +outputs ./declare_queues.py.out ./xml_producer.py.out ./xml_consumer.py.out diff --git a/RC9/qpid/python/examples/xml-exchange/verify.in b/RC9/qpid/python/examples/xml-exchange/verify.in new file mode 100644 index 0000000000..e5b9909408 --- /dev/null +++ b/RC9/qpid/python/examples/xml-exchange/verify.in @@ -0,0 +1,15 @@ +==== declare_queues.py.out +==== xml_producer.py.out +<weather><station>Raleigh-Durham International Airport (KRDU)</station><wind_speed_mph>0</wind_speed_mph><temperature_f>30</temperature_f><dewpoint>35</dewpoint></weather> +<weather><station>New Bern, Craven County Regional Airport (KEWN)</station><wind_speed_mph>2</wind_speed_mph><temperature_f>40</temperature_f><dewpoint>40</dewpoint></weather> +<weather><station>Boone, Watauga County Hospital Heliport (KTNB)</station><wind_speed_mph>5</wind_speed_mph><temperature_f>50</temperature_f><dewpoint>45</dewpoint></weather> +<weather><station>Hatteras, Mitchell Field (KHSE)</station><wind_speed_mph>10</wind_speed_mph><temperature_f>60</temperature_f><dewpoint>50</dewpoint></weather> +<weather><station>Raleigh-Durham International Airport (KRDU)</station><wind_speed_mph>16</wind_speed_mph><temperature_f>70</temperature_f><dewpoint>35</dewpoint></weather> +<weather><station>New Bern, Craven County Regional Airport (KEWN)</station><wind_speed_mph>22</wind_speed_mph><temperature_f>80</temperature_f><dewpoint>40</dewpoint></weather> +<weather><station>Boone, Watauga County Hospital Heliport (KTNB)</station><wind_speed_mph>28</wind_speed_mph><temperature_f>90</temperature_f><dewpoint>45</dewpoint></weather> +<weather><station>Hatteras, Mitchell Field (KHSE)</station><wind_speed_mph>35</wind_speed_mph><temperature_f>100</temperature_f><dewpoint>50</dewpoint></weather> +<weather><station>Raleigh-Durham International Airport (KRDU)</station><wind_speed_mph>42</wind_speed_mph><temperature_f>30</temperature_f><dewpoint>35</dewpoint></weather> +<weather><station>New Bern, Craven County Regional Airport (KEWN)</station><wind_speed_mph>51</wind_speed_mph><temperature_f>40</temperature_f><dewpoint>40</dewpoint></weather> +==== xml_consumer.py.out +<weather><station>Raleigh-Durham International Airport (KRDU)</station><wind_speed_mph>16</wind_speed_mph><temperature_f>70</temperature_f><dewpoint>35</dewpoint></weather> +No more messages! diff --git a/RC9/qpid/python/examples/xml-exchange/xml_consumer.py b/RC9/qpid/python/examples/xml-exchange/xml_consumer.py new file mode 100755 index 0000000000..cd89110b05 --- /dev/null +++ b/RC9/qpid/python/examples/xml-exchange/xml_consumer.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + direct_consumer.py + + This AMQP client reads messages from a message + queue named "message_queue". +""" + +import qpid +import sys +import os +from random import randint +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + + +#----- Initialization -------------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + + +#----- Read from queue -------------------------------------------- + +# Now let's create a local client queue and tell it to read +# incoming messages. + +# The consumer tag identifies the client-side queue. + +local_queue_name = "local_queue" +local_queue = session.incoming(local_queue_name) + +# Call message_consume() to tell the broker to deliver messages +# from the AMQP queue to this local client queue. The broker will +# start delivering messages as soon as local_queue.start() is called. + +session.message_subscribe(queue="message_queue", destination=local_queue_name) +local_queue.start() + +# Initialize 'final' and 'content', variables used to identify the last message. + +message = None +while True: + try: + message = local_queue.get(timeout=10) + session.message_accept(RangedSet(message.id)) + content = message.body + print content + except Empty: + print "No more messages!" + break + + +#----- Cleanup ------------------------------------------------ + +# Clean up before exiting so there are no open threads. +# + +session.close() diff --git a/RC9/qpid/python/examples/xml-exchange/xml_producer.py b/RC9/qpid/python/examples/xml-exchange/xml_producer.py new file mode 100755 index 0000000000..fa97cab4e1 --- /dev/null +++ b/RC9/qpid/python/examples/xml-exchange/xml_producer.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" + xml_producer.py + + Publishes messages to an XML exchange, using + the routing key "weather" +""" + + +import qpid +import sys +import os +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +#----- Functions ---------------------------------------- + +# Data for weather reports + +station = ("Raleigh-Durham International Airport (KRDU)", + "New Bern, Craven County Regional Airport (KEWN)", + "Boone, Watauga County Hospital Heliport (KTNB)", + "Hatteras, Mitchell Field (KHSE)") +wind_speed_mph = ( 0, 2, 5, 10, 16, 22, 28, 35, 42, 51, 61, 70, 80 ) +temperature_f = ( 30, 40, 50, 60, 70, 80, 90, 100 ) +dewpoint = ( 35, 40, 45, 50 ) + +def pick_one(list, i): + return str( list [ i % len(list)] ) + +def report(i): + return "<weather>" + "<station>" + pick_one(station,i)+ "</station>" + "<wind_speed_mph>" + pick_one(wind_speed_mph,i) + "</wind_speed_mph>" + "<temperature_f>" + pick_one(temperature_f,i) + "</temperature_f>" + "<dewpoint>" + pick_one(dewpoint,i) + "</dewpoint>" + "</weather>" + + +#----- Initialization ----------------------------------- + +# Set parameters for login + +host="127.0.0.1" +port=5672 +user="guest" +password="guest" + +# If an alternate host or port has been specified, use that instead +# (this is used in our unit tests) +if len(sys.argv) > 1 : + host=sys.argv[1] +if len(sys.argv) > 2 : + port=int(sys.argv[2]) + +# Create a connection. +socket = connect(host, port) +connection = Connection (sock=socket, username=user, password=password) +connection.start() +session = connection.session(str(uuid4())) + +#----- Publish some messages ------------------------------ + +# Create some messages and put them on the broker. + +props = session.delivery_properties(routing_key="weather") + +for i in range(10): + print report(i) + session.message_transfer(destination="xml", message=Message(props, report(i))) + + +#----- Cleanup -------------------------------------------- + +# Clean up before exiting so there are no open threads. + +session.close() |