diff options
author | Gordon Sim <gsim@apache.org> | 2011-02-10 10:12:41 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-02-10 10:12:41 +0000 |
commit | 731d6c4b13ed7ae5941a4b0f969be357f3d7e831 (patch) | |
tree | 5fc47c2ce19bbc0872356ef9c5f5ef073752f2cb /tests | |
parent | 8ead4c97b75e508a877e8d446a5bef096e606d84 (diff) | |
download | qpid-python-731d6c4b13ed7ae5941a4b0f969be357f3d7e831.tar.gz |
QPID-529: Priority queue implementation
QPID-2104: LVQ enhancement
These both required some refactoring of the Queue class to allow cleaner implementation of different types of behaviour. The in-memory storage of messages is now abstracted out behind an interface specified by qpid::broker::Messages which qpid::broker::Queue uses. Different implementations of that are available for the standard FIFO queue, priority queues and LVQ (I have also separated out the 'legacy' implementation of LVQ from the new version driven by QPID-2104).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1069322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tests')
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/__init__.py | 2 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/lvq.py | 75 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/priority.py | 221 |
3 files changed, 298 insertions, 0 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/__init__.py b/tests/src/py/qpid_tests/broker_0_10/__init__.py index f9315a6f90..b70828224d 100644 --- a/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -29,3 +29,5 @@ from message import * from query import * from queue import * from tx import * +from lvq import * +from priority import * diff --git a/tests/src/py/qpid_tests/broker_0_10/lvq.py b/tests/src/py/qpid_tests/broker_0_10/lvq.py new file mode 100644 index 0000000000..8fd6b88d78 --- /dev/null +++ b/tests/src/py/qpid_tests/broker_0_10/lvq.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +import math + +class LVQTests (Base): + """ + Test last value queue behaviour + """ + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def test_simple(self): + snd = self.ssn.sender("lvq; {create: sender, delete: sender, node: {x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}", + durable=self.durable()) + snd.send(create_message("a", "a-1")) + snd.send(create_message("b", "b-1")) + snd.send(create_message("a", "a-2")) + snd.send(create_message("a", "a-3")) + snd.send(create_message("c", "c-1")) + snd.send(create_message("c", "c-2")) + + rcv = self.ssn.receiver("lvq; {mode: browse}") + assert fetch_all(rcv) == ["b-1", "a-3", "c-2"] + + snd.send(create_message("b", "b-2")) + assert fetch_all(rcv) == ["b-2"] + + snd.send(create_message("c", "c-3")) + snd.send(create_message("d", "d-1")) + assert fetch_all(rcv) == ["c-3", "d-1"] + + snd.send(create_message("b", "b-3")) + assert fetch_all(rcv) == ["b-3"] + + rcv.close() + rcv = self.ssn.receiver("lvq; {mode: browse}") + assert (fetch_all(rcv) == ["a-3", "c-3", "d-1", "b-3"]) + + +def create_message(key, content): + msg = Message(content=content) + msg.properties["lvq-key"] = key + return msg + +def fetch_all(rcv): + content = [] + while True: + try: + content.append(rcv.fetch(0).content) + except Empty: + break + return content diff --git a/tests/src/py/qpid_tests/broker_0_10/priority.py b/tests/src/py/qpid_tests/broker_0_10/priority.py new file mode 100644 index 0000000000..81b3873cda --- /dev/null +++ b/tests/src/py/qpid_tests/broker_0_10/priority.py @@ -0,0 +1,221 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +from qpid.compat import set +import math + +class PriorityTests (Base): + """ + Test prioritised messaging + """ + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def prioritised_delivery(self, priorities, levels=10): + """ + Test that message on a queue are delivered in priority order. + """ + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels, + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + for expected in sorted(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + + def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10): + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + limit_policy = "x-qpid-fairshare:%s" % default_limit + if limits: + for k, v in limits.items(): + limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v) + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}" + % (levels, limit_policy), + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + if limits: + limit_function = lambda x : limits.get(x, 0) + else: + limit_function = lambda x : default_limit + for expected in fairshare(sorted(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True), + limit_function, levels): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.priority == expected.priority + assert msg.content == expected.content + self.ssn.acknowledge(msg) + + def test_prioritised_delivery_1(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10) + + def test_prioritised_delivery_2(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5) + + def test_fairshare_1(self): + self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]) + + def test_fairshare_2(self): + self.fairshare_delivery(priorities = [10 for i in range(30)]) + + def test_fairshare_3(self): + self.fairshare_delivery(priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3], limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}, levels=8) + + def test_browsing(self): + priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3] + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + snd = self.ssn.sender("priority-queue; {create: sender, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver("priority-queue; {mode: browse, delete: receiver}") + received = [] + try: + while True: received.append(rcv.fetch(0)) + except Empty: None + #check all messages on the queue were received by the browser; don't relay on any specific ordering at present + assert set([m.content for m in msgs]) == set([m.content for m in received]) + + def ring_queue_check(self, msgs): + """ + Ensure that a ring queue removes lowest priority messages first. + """ + snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':10"), + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + received = [] + try: + while True: received.append(rcv.fetch(0)) + except Empty: None + + expected = [] + for m in msgs: + while len(expected) > 9: + expected.sort(key=lambda x: priority_level(x.priority,10)) + expected.pop(0) + expected.append(m) + #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received]) + assert [m.content for m in expected] == [m.content for m in received] + + def test_ring_queue_1(self): + priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3] + seq = content("msg") + self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) + + def test_ring_queue_2(self): + priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9] + seq = content("msg") + self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) + + def test_requeue(self): + priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3] + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", + durable=self.durable()) + #want to have some messages requeued so enable prefetch on a dummy receiver + other = self.conn.session() + dummy = other.receiver("priority-queue") + dummy.capacity = 10 + + for m in msgs: snd.send(m) + + #fetch some with dummy receiver on which prefetch is also enabled + for i in range(5): + msg = dummy.fetch(0) + #close session without acknowledgements to requeue messages + other.close() + + #now test delivery works as expected after that + rcv = self.ssn.receiver(snd.target) + for expected in sorted(msgs, key=lambda m: priority_level(m.priority,10), reverse=True): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + +def content(base, counter=1): + while True: + yield "%s-%s" % (base, counter) + counter += 1 + +def address(name, create_policy="sender", delete_policy="receiver", arguments=None): + if arguments: node = "node: {x-declare:{arguments:{%s}}}" % arguments + else: node = "node: {}" + return "%s; {create: %s, delete: %s, %s}" % (name, create_policy, delete_policy, node) + +def fairshare(msgs, limit, levels): + """ + Generator to return prioritised messages in expected order for a given fairshare limit + """ + count = 0 + last_priority = None + postponed = [] + while msgs or postponed: + if not msgs: + msgs = postponed + count = 0 + last_priority = None + postponed = [] + msg = msgs.pop(0) + if last_priority and priority_level(msg.priority, levels) == last_priority: + count += 1 + else: + last_priority = priority_level(msg.priority, levels) + count = 1 + l = limit(last_priority) + if (l and count > l): + postponed.append(msg) + else: + yield msg + return + +def effective_priority(value, levels): + """ + Method to determine effective priority given a distinct number of + levels supported. Returns the lowest priority value that is of + equivalent priority to the value passed in. + """ + if value <= 5-math.ceil(levels/2.0): return 0 + if value >= 4+math.floor(levels/2.0): return 4+math.floor(levels/2.0) + return value + +def priority_level(value, levels): + """ + Method to determine which of a distinct number of priority levels + a given value falls into. + """ + offset = 5-math.ceil(levels/2.0) + return min(max(value - offset, 0), levels-1) |