diff options
| author | Alan Conway <aconway@apache.org> | 2012-07-13 18:45:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-07-13 18:45:23 +0000 |
| commit | c22cb4bef13a4a12a0c4098bad6da44c1a2cd444 (patch) | |
| tree | 69fb4648a49723a312bf1d8a06ab2ac3fcd8d33c /qpid/cpp/src/tests | |
| parent | 9170505cdd578ca48679d1245da6a4c1acefc2b0 (diff) | |
| download | qpid-python-c22cb4bef13a4a12a0c4098bad6da44c1a2cd444.tar.gz | |
QPID-4107 HA does not replicate alternate-exchange
Set alternate exchange on replicated queues and exchanges. If the exchange is
available, set it immediately. Otherwise remember what needs to be set so it can
be set when the exchange becomes available.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1361334 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 51 |
1 files changed, 49 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f900a841d5..6f8fa344d5 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition @@ -63,6 +63,7 @@ class HaBroker(Broker): args = copy(args) args += ["--load-module", BrokerTest.ha_lib, "--log-enable=debug+:ha::", + "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12: # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -188,7 +189,7 @@ class HaCluster(object): self.broker_id += 1 return name - def start(self, update_urls=True): + def start(self, update_urls=True, args=[]): """Start a new broker in the cluster""" b = HaBroker(self.test, name=self.next_name(), **self.kwargs) self._brokers.append(b) @@ -758,6 +759,52 @@ acl deny all all s1.sender("ex").send("foo"); self.assertEqual(s1.receiver("q").fetch().content, "foo") + def test_alterante_exchange(self): + """Verify that alternate-exchange on exchanges and queues is propagated + to new members of a cluster. """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + # altex exchange: acts as alternate exchange + s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + # altq queue bound to altex, collect re-routed messages. + s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + # 0ex exchange with alternate-exchange altex and no queues bound + s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # create queue q with alternate-exchange altex + s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") + # create a bunch of exchanges to ensure we don't clean up prematurely if the + # response comes in multiple fragments. + for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("0ex").send("foo") + altq = s.receiver("altq") + self.assertEqual("foo", altq.fetch(timeout=0).content) + s.acknowledge() + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", altq.fetch(timeout=0).content) + s.acknowledge() + + # Sanity check: alternate exchanges on original broker + verify(cluster[0]) + # Check backup that was connected during setup. + cluster[1].wait_backup("0ex") + cluster[1].wait_backup("q") + cluster.bounce(0) + verify(cluster[1]) + # Check a newly started backup. + cluster.start() + cluster[2].wait_backup("0ex") + cluster[2].wait_backup("q") + cluster.bounce(1) + verify(cluster[2]) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |
