summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rwxr-xr-xqpid/cpp/src/tests/federation.py550
-rwxr-xr-xqpid/cpp/src/tests/run_federation_tests13
2 files changed, 560 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 5f269c8363..c5e379b829 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -22,8 +22,32 @@ import sys
from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.queue import Empty
+from qpid.util import URL
from time import sleep
+
+class _FedBroker(object):
+ """
+ A proxy object for a remote broker. Contains connection and management
+ state.
+ """
+ def __init__(self, host, port,
+ conn=None, session=None, qmf_broker=None):
+ self.host = host
+ self.port = port
+ self.url = "%s:%d" % (host, port)
+ self.client_conn = None
+ self.client_session = None
+ self.qmf_broker = None
+ self.qmf_object = None
+ if conn is not None:
+ self.client_conn = conn
+ if session is not None:
+ self.client_session = session
+ if qmf_broker is not None:
+ self.qmf_broker = qmf_broker
+
+
class FederationTests(TestBase010):
def remote_host(self):
@@ -43,6 +67,44 @@ class FederationTests(TestBase010):
sleep(1)
total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link"))
+ def _setup_brokers(self):
+ ports = [self.remote_port()]
+ extra = self.defines.get("extra-brokers")
+ if extra:
+ for p in extra.split():
+ ports.append(int(p))
+
+ # broker[0] has already been set up.
+ self._brokers = [_FedBroker(self.broker.host,
+ self.broker.port,
+ self.conn,
+ self.session,
+ self.qmf_broker)]
+ self._brokers[0].qmf_object = self.qmf.getObjects(_class="broker")[0]
+
+ # setup remaining brokers
+ for _p in ports:
+ _b = _FedBroker(self.remote_host(), _p)
+ _b.client_conn = self.connect(host=self.remote_host(), port=_p)
+ _b.client_session = _b.client_conn.session("Fed_client_session_" + str(_p))
+ _b.qmf_broker = self.qmf.addBroker(_b.url)
+ for _bo in self.qmf.getObjects(_class="broker"):
+ if _bo.getBroker().getUrl() == _b.qmf_broker.getUrl():
+ _b.qmf_object = _bo
+ break
+ self._brokers.append(_b)
+
+ def _teardown_brokers(self):
+ """ Un-does _setup_brokers()
+ """
+ # broker[0] is configured at test setup, so it must remain configured
+ for _b in self._brokers[1:]:
+ self.qmf.delBroker(_b.qmf_broker)
+ if not _b.client_session.error():
+ _b.client_session.close(timeout=10)
+ _b.client_conn.close(timeout=10)
+
+
def test_bridge_create_and_close(self):
self.startQmf();
qmf = self.qmf
@@ -788,6 +850,494 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_dynamic_direct_route_prop(self):
+ """ Set up a tree of uni-directional routes across the direct exchange.
+ Bind the same key to the same queues on the leaf nodes. Verify a
+ message sent with the routing key transverses the tree an arrives at
+ each leaf. Remove one leaf's queue, and verify that messages still
+ reach the other leaf.
+
+ Route Topology:
+
+ +---> B2 queue:"test-queue", binding key:"spudboy"
+ B0 --> B1 --+
+ +---> B3 queue:"test-queue", binding key:"spudboy"
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create direct exchange on each broker
+
+ for _b in self._brokers:
+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+
+ # connect B0 --> B1
+ result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
+ self._brokers[0].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # connect B1 --> B2
+ result = self._brokers[2].qmf_object.connect(self._brokers[1].host,
+ self._brokers[1].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # connect B1 --> B3
+ result = self._brokers[3].qmf_object.connect(self._brokers[1].host,
+ self._brokers[1].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # for each link, bridge the "fedX.direct" exchanges:
+
+ for _l in qmf.getObjects(_class="link"):
+ # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker())))
+ result = _l.bridge(False, # durable
+ "fedX.direct", # src
+ "fedX.direct", # dst
+ "", # key
+ "", # tag
+ "", # excludes
+ False, # srcIsQueue
+ False, # srcIsLocal
+ True, # dynamic
+ 0) # sync
+ self.assertEqual(result.status, 0)
+
+ # create a queue on B2, bound to "spudboy"
+ self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy")
+
+ # create a queue on B3, bound to "spudboy"
+ self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy")
+
+ # subscribe to messages arriving on B2's queue
+ self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1")
+ queue_2 = self._brokers[2].client_session.incoming("f1")
+
+ # subscribe to messages arriving on B3's queue
+ self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1")
+ queue_3 = self._brokers[3].client_session.incoming("f1")
+
+ # wait until the binding key has propagated to each broker (twice at
+ # broker 2)
+
+ retries = 0
+ count = 0
+ for xxx in qmf.getObjects(_class="binding"):
+ if xxx.bindingKey == "spudboy":
+ count += 1
+ while count < 5:
+ retries += 1
+ if retries >= 10:
+ self.fail("binding did not propagate to all brokers!")
+ return
+ sleep(1)
+ count = 0
+ for xxx in qmf.getObjects(_class="binding"):
+ if xxx.bindingKey == "spudboy":
+ count += 1
+
+ # send 10 msgs from B0
+ for i in range(1, 11):
+ dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy")
+ self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i))
+
+ # get exactly 10 msgs on B2 and B3
+ for i in range(1, 11):
+ msg = queue_2.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ msg = queue_3.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ try:
+ extra = queue_2.get(timeout=1)
+ self.fail("Got unexpected message in queue_2: " + extra.body)
+ except Empty: None
+
+ try:
+ extra = queue_3.get(timeout=1)
+ self.fail("Got unexpected message in queue_3: " + extra.body)
+ except Empty: None
+
+ # tear down the queue on B2
+ self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy")
+ self._brokers[2].client_session.message_cancel(destination="f1")
+ self._brokers[2].client_session.queue_delete(queue="fedX1")
+
+ # @todo - find a proper way to check the propagation here!
+ sleep(6)
+
+ # send 10 msgs from B0
+ for i in range(1, 11):
+ dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy")
+ self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i))
+
+ # get exactly 10 msgs on B3 only
+ for i in range(1, 11):
+ msg = queue_3.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ try:
+ extra = queue_3.get(timeout=1)
+ self.fail("Got unexpected message in queue_3: " + extra.body)
+ except Empty: None
+
+ # cleanup
+
+ self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy")
+ self._brokers[3].client_session.message_cancel(destination="f1")
+ self._brokers[3].client_session.queue_delete(queue="fedX1")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers:
+ _b.client_session.exchange_delete(exchange="fedX.direct")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+ def test_dynamic_topic_route_prop(self):
+ """ Set up a tree of uni-directional routes across a topic exchange.
+ Bind the same key to the same queues on the leaf nodes. Verify a
+ message sent with the routing key transverses the tree an arrives at
+ each leaf. Remove one leaf's queue, and verify that messages still
+ reach the other leaf.
+
+ Route Topology:
+
+ +---> B2 queue:"test-queue", binding key:"spud.*"
+ B0 --> B1 --+
+ +---> B3 queue:"test-queue", binding key:"spud.*"
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create topic exchange on each broker
+
+ for _b in self._brokers:
+ _b.client_session.exchange_declare(exchange="fedX.topic", type="topic")
+
+ # connect B0 --> B1
+ result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
+ self._brokers[0].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # connect B1 --> B2
+ result = self._brokers[2].qmf_object.connect(self._brokers[1].host,
+ self._brokers[1].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # connect B1 --> B3
+ result = self._brokers[3].qmf_object.connect(self._brokers[1].host,
+ self._brokers[1].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # for each link, bridge the "fedX.topic" exchanges:
+
+ for _l in qmf.getObjects(_class="link"):
+ # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker())))
+ result = _l.bridge(False, # durable
+ "fedX.topic", # src
+ "fedX.topic", # dst
+ "", # key
+ "", # tag
+ "", # excludes
+ False, # srcIsQueue
+ False, # srcIsLocal
+ True, # dynamic
+ 0) # sync
+ self.assertEqual(result.status, 0)
+
+ # create a queue on B2, bound to "spudboy"
+ self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*")
+
+ # create a queue on B3, bound to "spudboy"
+ self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*")
+
+ # subscribe to messages arriving on B2's queue
+ self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1")
+ queue_2 = self._brokers[2].client_session.incoming("f1")
+
+ # subscribe to messages arriving on B3's queue
+ self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1")
+ queue_3 = self._brokers[3].client_session.incoming("f1")
+
+ # wait until the binding key has propagated to each broker (twice at
+ # broker 2)
+
+ retries = 0
+ count = 0
+ for xxx in qmf.getObjects(_class="binding"):
+ if xxx.bindingKey == "spud.*":
+ count += 1
+ while count < 5:
+ retries += 1
+ if retries >= 10:
+ self.fail("binding did not propagate to all brokers!")
+ return
+ sleep(1)
+ count = 0
+ for xxx in qmf.getObjects(_class="binding"):
+ if xxx.bindingKey == "spud.*":
+ count += 1
+
+ # send 10 msgs from B0
+ for i in range(1, 11):
+ dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy")
+ self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i))
+
+ # get exactly 10 msgs on B2 and B3
+ for i in range(1, 11):
+ msg = queue_2.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ msg = queue_3.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ try:
+ extra = queue_2.get(timeout=1)
+ self.fail("Got unexpected message in queue_2: " + extra.body)
+ except Empty: None
+
+ try:
+ extra = queue_3.get(timeout=1)
+ self.fail("Got unexpected message in queue_3: " + extra.body)
+ except Empty: None
+
+ # tear down the queue on B2
+ self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*")
+ self._brokers[2].client_session.message_cancel(destination="f1")
+ self._brokers[2].client_session.queue_delete(queue="fedX1")
+
+ # @todo - find a proper way to check the propagation here!
+ sleep(6)
+
+ # send 10 msgs from B0
+ for i in range(1, 11):
+ dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy")
+ self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i))
+
+ # get exactly 10 msgs on B3 only
+ for i in range(1, 11):
+ msg = queue_3.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ try:
+ extra = queue_3.get(timeout=1)
+ self.fail("Got unexpected message in queue_3: " + extra.body)
+ except Empty: None
+
+ # cleanup
+
+ self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*")
+ self._brokers[3].client_session.message_cancel(destination="f1")
+ self._brokers[3].client_session.queue_delete(queue="fedX1")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers:
+ _b.client_session.exchange_delete(exchange="fedX.topic")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+
+ def test_dynamic_fanout_route_prop(self):
+ """ Set up a tree of uni-directional routes across a fanout exchange.
+ Bind the same key to the same queues on the leaf nodes. Verify a
+ message sent with the routing key transverses the tree an arrives at
+ each leaf. Remove one leaf's queue, and verify that messages still
+ reach the other leaf.
+
+ Route Topology:
+
+ +---> B2 queue:"test-queue", binding key:"spud.*"
+ B0 --> B1 --+
+ +---> B3 queue:"test-queue", binding key:"spud.*"
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create fanout exchange on each broker
+
+ for _b in self._brokers:
+ _b.client_session.exchange_declare(exchange="fedX.fanout", type="fanout")
+
+ # connect B0 --> B1
+ result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
+ self._brokers[0].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # connect B1 --> B2
+ result = self._brokers[2].qmf_object.connect(self._brokers[1].host,
+ self._brokers[1].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # connect B1 --> B3
+ result = self._brokers[3].qmf_object.connect(self._brokers[1].host,
+ self._brokers[1].port,
+ False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ # for each link, bridge the "fedX.fanout" exchanges:
+
+ for _l in qmf.getObjects(_class="link"):
+ # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker())))
+ result = _l.bridge(False, # durable
+ "fedX.fanout", # src
+ "fedX.fanout", # dst
+ "", # key
+ "", # tag
+ "", # excludes
+ False, # srcIsQueue
+ False, # srcIsLocal
+ True, # dynamic
+ 0) # sync
+ self.assertEqual(result.status, 0)
+
+ # create a queue on B2, bound to the exchange
+ self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.fanout")
+
+ # create a queue on B3, bound to the exchange
+ self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+ self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.fanout")
+
+ # subscribe to messages arriving on B2's queue
+ self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1")
+ queue_2 = self._brokers[2].client_session.incoming("f1")
+
+ # subscribe to messages arriving on B3's queue
+ self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1")
+ queue_3 = self._brokers[3].client_session.incoming("f1")
+
+ # wait until the binding has propagated to each broker by
+ # counting the number of bindings that have origins.
+ # Should have 3: 1 for B0, 2 for B1.
+
+ retries = 0
+ count = 0
+ for xxx in qmf.getObjects(_class="binding"):
+ if xxx.origin:
+ count += 1
+ while count < 3:
+ retries += 1
+ if retries >= 10:
+ self.fail("binding did not propagate to all brokers!")
+ return
+ sleep(1)
+ count = 0
+ for xxx in qmf.getObjects(_class="binding"):
+ if xxx.origin:
+ count += 1
+
+ # send 10 msgs from B0
+ for i in range(1, 11):
+ dp = self._brokers[0].client_session.delivery_properties()
+ self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i))
+
+ # get exactly 10 msgs on B2 and B3
+ for i in range(1, 11):
+ msg = queue_2.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ msg = queue_3.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ try:
+ extra = queue_2.get(timeout=1)
+ self.fail("Got unexpected message in queue_2: " + extra.body)
+ except Empty: None
+
+ try:
+ extra = queue_3.get(timeout=1)
+ self.fail("Got unexpected message in queue_3: " + extra.body)
+ except Empty: None
+
+ # tear down the queue on B2
+ self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.fanout")
+ self._brokers[2].client_session.message_cancel(destination="f1")
+ self._brokers[2].client_session.queue_delete(queue="fedX1")
+
+ # @todo - find a proper way to check the propagation here!
+ sleep(6)
+
+ # send 10 msgs from B0
+ for i in range(1, 11):
+ dp = self._brokers[0].client_session.delivery_properties()
+ self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i))
+
+ # get exactly 10 msgs on B3 only
+ for i in range(1, 11):
+ msg = queue_3.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ try:
+ extra = queue_3.get(timeout=1)
+ self.fail("Got unexpected message in queue_3: " + extra.body)
+ except Empty: None
+
+ # cleanup
+
+ self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.fanout")
+ self._brokers[3].client_session.message_cancel(destination="f1")
+ self._brokers[3].client_session.queue_delete(queue="fedX1")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers:
+ _b.client_session.exchange_delete(exchange="fedX.fanout")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
def getProperty(self, msg, name):
for h in msg.headers:
diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests
index 8d7954a533..f5bb123d0a 100755
--- a/qpid/cpp/src/tests/run_federation_tests
+++ b/qpid/cpp/src/tests/run_federation_tests
@@ -30,20 +30,27 @@ start_brokers() {
LOCAL_PORT=`cat qpidd.port`
../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
REMOTE_PORT=`cat qpidd.port`
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+ REMOTE_B1=`cat qpidd.port`
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+ REMOTE_B2=`cat qpidd.port`
}
stop_brokers() {
$QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORT
$QPIDD_EXEC --no-module-dir -q --port $REMOTE_PORT
+ $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B1
+ $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B2
}
if test -d ${PYTHON_DIR} ; then
start_brokers
- echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT"
- $QPID_PYTHON_TEST -m federation -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT $@
+ echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT $REMOTE_B1 $REMOTE_B2"
+ $QPID_PYTHON_TEST -m federation -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@
RETCODE=$?
stop_brokers
- if test x$RETCODE != x0; then
+ if test x$RETCODE != x0; then
echo "FAIL federation tests"; exit 1;
fi
fi