summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-12-20 18:28:58 +0000
committerAlan Conway <aconway@apache.org>2012-12-20 18:28:58 +0000
commitcdc1d5ce0b40acfea625c8c43e41372bac3a0f8a (patch)
tree6587beccb8fd3a25cf0af40f6f2ed3fafc1e8d6d /qpid/cpp/src/tests/ha_tests.py
parentda579f1fe6401fa548ee4fee90dea028a5fe3b68 (diff)
downloadqpid-python-cdc1d5ce0b40acfea625c8c43e41372bac3a0f8a.tar.gz
Bug 886656 - HA backup broker does not properly increment the alternate exchange user count
Set alternate exchange in-use counters correctly on backup brokers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424617 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py35
1 files changed, 28 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index b29ff42627..fdbd8a153b 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4
from brokertest import *
from ha_test import *
@@ -613,22 +613,24 @@ acl deny all all
to new members of a cluster. """
cluster = HaCluster(self, 2)
s = cluster[0].connect().session()
+ cluster[0].wait_status("active")
+ cluster[1].wait_status("ready")
# altex exchange: acts as alternate exchange
s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
# altq queue bound to altex, collect re-routed messages.
s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
- # 0ex exchange with alternate-exchange altex and no queues bound
- s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # ex exchange with alternate-exchange altex and no queues bound
+ s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
# create queue q with alternate-exchange altex
s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
# create a bunch of exchanges to ensure we don't clean up prematurely if the
# response comes in multiple fragments.
- for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+ for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
def verify(broker):
s = broker.connect().session()
# Verify unmatched message goes to ex's alternate.
- s.sender("0ex").send("foo")
+ s.sender("ex").send("foo")
altq = s.receiver("altq")
self.assertEqual("foo", altq.fetch(timeout=0).content)
s.acknowledge()
@@ -640,20 +642,39 @@ acl deny all all
self.assertEqual("bar", altq.fetch(timeout=0).content)
s.acknowledge()
+ def ss(n): return cluster[n].connect().session()
+
# Sanity check: alternate exchanges on original broker
verify(cluster[0])
+ # Altex is in use as an alternate exchange.
+ self.assertRaises(SessionError,
+ lambda:ss(0).sender("altex;{delete:always}").close())
# Check backup that was connected during setup.
- cluster[1].wait_backup("0ex")
+ cluster[1].wait_status("ready")
+ cluster[1].wait_backup("ex")
cluster[1].wait_backup("q")
cluster.bounce(0)
verify(cluster[1])
+
# Check a newly started backup.
cluster.start()
- cluster[2].wait_backup("0ex")
+ cluster[2].wait_status("ready")
+ cluster[2].wait_backup("ex")
cluster[2].wait_backup("q")
cluster.bounce(1)
verify(cluster[2])
+ # Check that alt-exchange in-use count is replicated
+ s = cluster[2].connect().session();
+
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("q;{delete:always}").close()
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("ex;{delete:always}").close()
+ s.sender("altex;{delete:always}").close()
+
def test_priority_reroute(self):
"""Regression test for QPID-4262, rerouting messages from a priority queue
to itself causes a crash"""