diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 51 |
1 files changed, 49 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f900a841d5..6f8fa344d5 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition @@ -63,6 +63,7 @@ class HaBroker(Broker): args = copy(args) args += ["--load-module", BrokerTest.ha_lib, "--log-enable=debug+:ha::", + "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12: # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -188,7 +189,7 @@ class HaCluster(object): self.broker_id += 1 return name - def start(self, update_urls=True): + def start(self, update_urls=True, args=[]): """Start a new broker in the cluster""" b = HaBroker(self.test, name=self.next_name(), **self.kwargs) self._brokers.append(b) @@ -758,6 +759,52 @@ acl deny all all s1.sender("ex").send("foo"); self.assertEqual(s1.receiver("q").fetch().content, "foo") + def test_alterante_exchange(self): + """Verify that alternate-exchange on exchanges and queues is propagated + to new members of a cluster. """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + # altex exchange: acts as alternate exchange + s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + # altq queue bound to altex, collect re-routed messages. + s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + # 0ex exchange with alternate-exchange altex and no queues bound + s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # create queue q with alternate-exchange altex + s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") + # create a bunch of exchanges to ensure we don't clean up prematurely if the + # response comes in multiple fragments. + for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("0ex").send("foo") + altq = s.receiver("altq") + self.assertEqual("foo", altq.fetch(timeout=0).content) + s.acknowledge() + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", altq.fetch(timeout=0).content) + s.acknowledge() + + # Sanity check: alternate exchanges on original broker + verify(cluster[0]) + # Check backup that was connected during setup. + cluster[1].wait_backup("0ex") + cluster[1].wait_backup("q") + cluster.bounce(0) + verify(cluster[1]) + # Check a newly started backup. + cluster.start() + cluster[2].wait_backup("0ex") + cluster[2].wait_backup("q") + cluster.bounce(1) + verify(cluster[2]) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |
