summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/interlink_tests.py
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-28 14:56:46 +0000
committerGordon Sim <gsim@apache.org>2013-08-28 14:56:46 +0000
commit0b1ee9f4672f8c21d3622cd971b671bae3b10401 (patch)
tree4e3baec48a73e1f241dda1e81a70006ea714ef17 /qpid/cpp/src/tests/interlink_tests.py
parent01cb164d09b628206335c138eba796b3487c5ea0 (diff)
downloadqpid-python-0b1ee9f4672f8c21d3622cd971b671bae3b10401.tar.gz
QPID-4708: support for reconnect over AMQP 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518233 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/interlink_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py67
1 files changed, 64 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
index 0d2b757152..1724607533 100755
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
import traceback
from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from brokertest import *
+from ha_test import HaPort
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent, BrokerObject
@@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest):
def setUp(self):
BrokerTest.setUp(self)
os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
- self.broker = self.amqp_broker()
+ self.port_holder = HaPort(self)
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
self.default_config = Config(self.broker)
self.agent = BrokerAgent(self.broker.connect())
@@ -252,14 +254,73 @@ class AmqpBrokerTest(BrokerTest):
#send to q on broker B through brokerA
self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB))
+ def test_reconnect(self):
+ receiver_cmd = ["qpid-receive",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}",
+ "--timeout=10", "--print-content=true", "--print-headers=false"
+ ]
+ receiver = self.popen(receiver_cmd, stdout=PIPE)
+
+ sender_cmd = ["qpid-send",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}",
+ "--content-stdin", "--send-eos=1"
+ ]
+ sender = self.popen(sender_cmd, stdin=PIPE)
+ sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
+
+
+ batch1 = ["message-%s" % (i+1) for i in range(10000)]
+ for m in batch1:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ self.broker.kill()
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
+
+ batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)]
+ for m in batch2:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ sender.stdin.close()
+
+ last = None
+ m = receiver.stdout.readline().rstrip()
+ while len(m):
+ last = m
+ m = receiver.stdout.readline().rstrip()
+ assert last == "message-20000", (last)
+
""" Create and return a broker with AMQP 1.0 support """
def amqp_broker(self):
assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ self.port_holder = HaPort(self) #reserve port
args = ["--load-module", BrokerTest.amqp_lib,
- "--max-negotiate-time=600000",
+ "--socket-fd=%s" % self.port_holder.fileno,
+ "--listen-disable=tcp",
"--log-enable=trace+:Protocol",
"--log-enable=info+"]
- return BrokerTest.broker(self, args)
+ return BrokerTest.broker(self, args, port=self.port_holder.port)
+
+ def amqp_broker(self, port_holder=None):
+ assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ if port_holder:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--socket-fd=%s" % port_holder.fileno,
+ "--listen-disable=tcp",
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args, port=port_holder.port)
+ else:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args)
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)