From 09bd97be6159c787ee49bf995a43990742780d0e Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 9 May 2008 19:26:28 +0000 Subject: QPID-1045 and QPID-1041: added a destination attribute to incoming queues, and added a start() method to incoming queues as syntactic sugar for the verbose message flow idiom git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654918 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qpid/session.py | 13 ++++++++++++- qpid/python/qpid/spec010.py | 10 ++++++++-- qpid/python/tests_0-10/message.py | 18 +++++++++++++++++- 3 files changed, 37 insertions(+), 4 deletions(-) (limited to 'qpid/python') diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py index a1103e0428..aa0446b941 100644 --- a/qpid/python/qpid/session.py +++ b/qpid/python/qpid/session.py @@ -75,7 +75,7 @@ class Session(Invoker): try: queue = self._incoming.get(destination) if queue == None: - queue = Queue() + queue = Incoming(self, destination) self._incoming[destination] = queue return queue finally: @@ -319,6 +319,17 @@ class Sender: for range in commands.ranges: self._completed.add(range.lower, range.upper) +class Incoming(Queue): + + def __init__(self, session, destination): + Queue.__init__(self) + self.session = session + self.destination = destination + + def start(self): + for unit in self.session.credit_unit.values(): + self.session.message_flow(self.destination, unit, 0xFFFFFFFF) + class Delegate: def __init__(self, session): diff --git a/qpid/python/qpid/spec010.py b/qpid/python/qpid/spec010.py index fb625eab65..58d305aa6c 100644 --- a/qpid/python/qpid/spec010.py +++ b/qpid/python/qpid/spec010.py @@ -170,10 +170,14 @@ class Enum: def __init__(self, name): self.name = name + self._names = () + self._values = () + + def values(self): + return self._values def __repr__(self): - return "%s(%s)" % (self.name, ", ".join([k for k in self.__dict__.keys() - if k != "name"])) + return "%s(%s)" % (self.name, ", ".join(self._names)) class Choice(Named, Node): @@ -192,6 +196,8 @@ class Choice(Named, Node): enum = Enum(node.name) node.spec.enums[node.name] = enum setattr(enum, self.name, self.value) + enum._names += (self.name,) + enum._values += (self.value,) class Composite(Type, Coded): diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index e21716d855..8f3d7bdaef 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -229,6 +229,7 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) session.message_subscribe(destination="my-consumer", queue="test-queue-4") + myqueue = session.incoming("my-consumer") session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFF) session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF) @@ -237,7 +238,6 @@ class MessageTests(TestBase010): #cancel should stop messages being delivered session.message_cancel(destination="my-consumer") session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) - myqueue = session.incoming("my-consumer") msg = myqueue.get(timeout=1) self.assertEqual("One", msg.body) try: @@ -1001,6 +1001,22 @@ class MessageTests(TestBase010): self.assertEquals("", msg.body) session.message_accept(RangedSet(msg.id)) + def test_incoming_start(self): + q = "test_incoming_start" + session = self.session + + session.queue_declare(queue=q, exclusive=True, auto_delete=True) + session.message_subscribe(queue=q, destination="msgs") + messages = session.incoming("msgs") + assert messages.destination == "msgs" + + dp = session.delivery_properties(routing_key=q) + session.message_transfer(message=Message(dp, "test")) + + messages.start() + msg = messages.get() + assert msg.body == "test" + def assertDataEquals(self, session, msg, expected): self.assertEquals(expected, msg.body) -- cgit v1.2.1