summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py51
1 files changed, 49 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f900a841d5..6f8fa344d5 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
@@ -63,6 +63,7 @@ class HaBroker(Broker):
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
"--log-enable=debug+:ha::",
+ "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12:
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
@@ -188,7 +189,7 @@ class HaCluster(object):
self.broker_id += 1
return name
- def start(self, update_urls=True):
+ def start(self, update_urls=True, args=[]):
"""Start a new broker in the cluster"""
b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
self._brokers.append(b)
@@ -758,6 +759,52 @@ acl deny all all
s1.sender("ex").send("foo");
self.assertEqual(s1.receiver("q").fetch().content, "foo")
+ def test_alterante_exchange(self):
+ """Verify that alternate-exchange on exchanges and queues is propagated
+ to new members of a cluster. """
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session()
+ # altex exchange: acts as alternate exchange
+ s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+ # altq queue bound to altex, collect re-routed messages.
+ s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ # 0ex exchange with alternate-exchange altex and no queues bound
+ s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # create queue q with alternate-exchange altex
+ s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+ # create a bunch of exchanges to ensure we don't clean up prematurely if the
+ # response comes in multiple fragments.
+ for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+
+ def verify(broker):
+ s = broker.connect().session()
+ # Verify unmatched message goes to ex's alternate.
+ s.sender("0ex").send("foo")
+ altq = s.receiver("altq")
+ self.assertEqual("foo", altq.fetch(timeout=0).content)
+ s.acknowledge()
+ # Verify rejected message goes to q's alternate.
+ s.sender("q").send("bar")
+ msg = s.receiver("q").fetch(timeout=0)
+ self.assertEqual("bar", msg.content)
+ s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ self.assertEqual("bar", altq.fetch(timeout=0).content)
+ s.acknowledge()
+
+ # Sanity check: alternate exchanges on original broker
+ verify(cluster[0])
+ # Check backup that was connected during setup.
+ cluster[1].wait_backup("0ex")
+ cluster[1].wait_backup("q")
+ cluster.bounce(0)
+ verify(cluster[1])
+ # Check a newly started backup.
+ cluster.start()
+ cluster[2].wait_backup("0ex")
+ cluster[2].wait_backup("q")
+ cluster.bounce(1)
+ verify(cluster[2])
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit