summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/interlink_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/interlink_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py80
1 files changed, 75 insertions, 5 deletions
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
index 129283ac24..608d4ac890 100755
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
import traceback
from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from brokertest import *
+from ha_test import HaPort
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent, BrokerObject
@@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest):
def setUp(self):
BrokerTest.setUp(self)
os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
- self.broker = self.amqp_broker()
+ self.port_holder = HaPort(self)
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
self.default_config = Config(self.broker)
self.agent = BrokerAgent(self.broker.connect())
@@ -126,6 +128,9 @@ class AmqpBrokerTest(BrokerTest):
def test_translate2(self):
self.send_and_receive(send_config=Config(self.broker, version="amqp0-10"))
+ def test_translate_with_large_routingkey(self):
+ self.send_and_receive(send_config=Config(self.broker, address="amq.topic/a.%s" % ("x" * 256), version="amqp1.0"), recv_config=Config(self.broker, address="amq.topic/a.*", version="amqp0-10"), wait_for_receiver=True)
+
def send_and_receive_empty(self, send_config=None, recv_config=None):
sconfig = send_config or self.default_config
rconfig = recv_config or self.default_config
@@ -218,16 +223,22 @@ class AmqpBrokerTest(BrokerTest):
assert len(domains) == 1
assert domains[0].name == "BrokerB"
- def test_incoming_link(self):
+ def incoming_link(self, mechanism):
brokerB = self.amqp_broker()
agentB = BrokerAgent(brokerB.connect())
self.agent.create("queue", "q")
agentB.create("queue", "q")
- self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
+ self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":mechanism})
self.agent.create("incoming", "Link1", {"domain":"BrokerB","source":"q","target":"q"})
#send to brokerB, receive from brokerA
self.send_and_receive(send_config=Config(brokerB))
+ def test_incoming_link_anonymous(self):
+ self.incoming_link("ANONYMOUS")
+
+ def test_incoming_link_nosasl(self):
+ self.incoming_link("NONE")
+
def test_outgoing_link(self):
brokerB = self.amqp_broker()
agentB = BrokerAgent(brokerB.connect())
@@ -246,14 +257,73 @@ class AmqpBrokerTest(BrokerTest):
#send to q on broker B through brokerA
self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB))
+ def test_reconnect(self):
+ receiver_cmd = ["qpid-receive",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}",
+ "--timeout=10", "--print-content=true", "--print-headers=false"
+ ]
+ receiver = self.popen(receiver_cmd, stdout=PIPE)
+
+ sender_cmd = ["qpid-send",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}",
+ "--content-stdin", "--send-eos=1"
+ ]
+ sender = self.popen(sender_cmd, stdin=PIPE)
+ sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
+
+
+ batch1 = ["message-%s" % (i+1) for i in range(10000)]
+ for m in batch1:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ self.broker.kill()
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
+
+ batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)]
+ for m in batch2:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ sender.stdin.close()
+
+ last = None
+ m = receiver.stdout.readline().rstrip()
+ while len(m):
+ last = m
+ m = receiver.stdout.readline().rstrip()
+ assert last == "message-20000", (last)
+
""" Create and return a broker with AMQP 1.0 support """
def amqp_broker(self):
assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ self.port_holder = HaPort(self) #reserve port
args = ["--load-module", BrokerTest.amqp_lib,
- "--max-negotiate-time=600000",
+ "--socket-fd=%s" % self.port_holder.fileno,
+ "--listen-disable=tcp",
"--log-enable=trace+:Protocol",
"--log-enable=info+"]
- return BrokerTest.broker(self, args)
+ return BrokerTest.broker(self, args, port=self.port_holder.port)
+
+ def amqp_broker(self, port_holder=None):
+ assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ if port_holder:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--socket-fd=%s" % port_holder.fileno,
+ "--listen-disable=tcp",
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args, port=port_holder.port)
+ else:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args)
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)