diff options
Diffstat (limited to 'python/examples/pubsub/topic_publisher.py')
| -rwxr-xr-x | python/examples/pubsub/topic_publisher.py | 59 |
1 files changed, 25 insertions, 34 deletions
diff --git a/python/examples/pubsub/topic_publisher.py b/python/examples/pubsub/topic_publisher.py index e302d58ad4..b79896eaf6 100755 --- a/python/examples/pubsub/topic_publisher.py +++ b/python/examples/pubsub/topic_publisher.py @@ -9,8 +9,11 @@ import qpid import sys -from qpid.client import Client -from qpid.content import Content +import os +from random import randint +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message from qpid.queue import Empty #----- Initialization ----------------------------------- @@ -18,18 +21,20 @@ from qpid.queue import Empty # Set parameters for login. host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 -amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml" user="guest" password="guest" +amqp_spec="" -# Create a client and log in to it. +try: + amqp_spec = os.environ["AMQP_SPEC"] +except KeyError: + amqp_spec="/usr/share/amqp/amqp.0-10.xml" -spec = qpid.spec.load(amqp_spec) -client = Client(host, port, spec) -client.start({"LOGIN": user, "PASSWORD": password}) +# Create a connection. +conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) +conn.start() -session = client.session() -session.session_open() +session = conn.session(str(randint(1,64*1024))) #----- Publish some messages ------------------------------ @@ -37,44 +42,30 @@ session.session_open() # topic exchange. The routing keys are "usa.news", "usa.weather", # "europe.news", and "europe.weather". +def send_msg(routing_key): + props = session.delivery_properties(routing_key=routing_key) + for i in range(5): + session.message_transfer("amq.topic", None, None, Message(props,"message " + str(i))) # usa.news - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "usa.news" - session.message_transfer(destination="amq.topic", content=message) +send_msg("usa.news") # usa.weather - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "usa.weather" - session.message_transfer(destination="amq.topic", content=message) +send_msg("usa.weather") # europe.news - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "europe.news" - session.message_transfer(destination="amq.topic", content=message) +send_msg("europe.news") # europe.weather - -for i in range(5): - message = Content("message " + str(i)) - message["routing_key"] = "europe.weather" - session.message_transfer(destination="amq.topic", content=message) +send_msg("europe.weather") # Signal termination - -message = Content("That's all, folks!") -message["routing_key"] = "control" -session.message_transfer(destination="amq.topic", content=message) +props = session.delivery_properties(routing_key="control") +session.message_transfer("amq.topic",None, None, Message(props,"That's all, folks!")) #----- Cleanup -------------------------------------------- # Clean up before exiting so there are no open threads. -session.session_close() +session.close(timeout=10) |
