diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-28 14:56:46 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-28 14:56:46 +0000 |
| commit | 0b1ee9f4672f8c21d3622cd971b671bae3b10401 (patch) | |
| tree | 4e3baec48a73e1f241dda1e81a70006ea714ef17 /qpid/cpp/src/tests/interlink_tests.py | |
| parent | 01cb164d09b628206335c138eba796b3487c5ea0 (diff) | |
| download | qpid-python-0b1ee9f4672f8c21d3622cd971b671bae3b10401.tar.gz | |
QPID-4708: support for reconnect over AMQP 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518233 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/interlink_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/interlink_tests.py | 67 |
1 files changed, 64 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py index 0d2b757152..1724607533 100755 --- a/qpid/cpp/src/tests/interlink_tests.py +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil import traceback from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from brokertest import * +from ha_test import HaPort from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent, BrokerObject @@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest): def setUp(self): BrokerTest.setUp(self) os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib) - self.broker = self.amqp_broker() + self.port_holder = HaPort(self) + self.broker = self.amqp_broker(port_holder=self.port_holder) self.default_config = Config(self.broker) self.agent = BrokerAgent(self.broker.connect()) @@ -252,14 +254,73 @@ class AmqpBrokerTest(BrokerTest): #send to q on broker B through brokerA self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB)) + def test_reconnect(self): + receiver_cmd = ["qpid-receive", + "--broker", self.broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}", + "--timeout=10", "--print-content=true", "--print-headers=false" + ] + receiver = self.popen(receiver_cmd, stdout=PIPE) + + sender_cmd = ["qpid-send", + "--broker", self.broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}", + "--content-stdin", "--send-eos=1" + ] + sender = self.popen(sender_cmd, stdin=PIPE) + sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112 + + + batch1 = ["message-%s" % (i+1) for i in range(10000)] + for m in batch1: + sender.stdin.write(m + "\n") + sender.stdin.flush() + + self.broker.kill() + self.broker = self.amqp_broker(port_holder=self.port_holder) + + batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)] + for m in batch2: + sender.stdin.write(m + "\n") + sender.stdin.flush() + + sender.stdin.close() + + last = None + m = receiver.stdout.readline().rstrip() + while len(m): + last = m + m = receiver.stdout.readline().rstrip() + assert last == "message-20000", (last) + """ Create and return a broker with AMQP 1.0 support """ def amqp_broker(self): assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in" + self.port_holder = HaPort(self) #reserve port args = ["--load-module", BrokerTest.amqp_lib, - "--max-negotiate-time=600000", + "--socket-fd=%s" % self.port_holder.fileno, + "--listen-disable=tcp", "--log-enable=trace+:Protocol", "--log-enable=info+"] - return BrokerTest.broker(self, args) + return BrokerTest.broker(self, args, port=self.port_holder.port) + + def amqp_broker(self, port_holder=None): + assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in" + if port_holder: + args = ["--load-module", BrokerTest.amqp_lib, + "--socket-fd=%s" % port_holder.fileno, + "--listen-disable=tcp", + "--log-enable=trace+:Protocol", + "--log-enable=info+"] + return BrokerTest.broker(self, args, port=port_holder.port) + else: + args = ["--load-module", BrokerTest.amqp_lib, + "--log-enable=trace+:Protocol", + "--log-enable=info+"] + return BrokerTest.broker(self, args) + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |
