diff options
| author | Gordon Sim <gsim@apache.org> | 2013-05-28 16:26:54 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-05-28 16:26:54 +0000 |
| commit | d3f7bac7c1f31c5a52032a5874662b6bf0efd5c2 (patch) | |
| tree | 686c2ac97c383ba1fabdced56c190de8e1b4ec41 /qpid/cpp/src/tests/interlink_tests.py | |
| parent | 2af5cb678ce7d1bb434e6862ada8d1b3b8acd02d (diff) | |
| download | qpid-python-d3f7bac7c1f31c5a52032a5874662b6bf0efd5c2.tar.gz | |
QPID-4713: fix handling of reply to when converting from 1.0 to 0-10 format
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1486989 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/interlink_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/interlink_tests.py | 82 |
1 files changed, 78 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py index 5c63ac9f82..129283ac24 100755 --- a/qpid/cpp/src/tests/interlink_tests.py +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -50,13 +50,15 @@ class AmqpBrokerTest(BrokerTest): self.default_config = Config(self.broker) self.agent = BrokerAgent(self.broker.connect()) - def sender(self, config): + def sender(self, config, reply_to=None): cmd = ["qpid-send", "--broker", config.url, "--address", config.address, "--connection-options", "{protocol:%s}" % config.version, "--content-stdin", "--send-eos=1" ] + if reply_to: + cmd.append( "--reply-to=%s" % reply_to) return self.popen(cmd, stdin=PIPE) def receiver(self, config): @@ -68,13 +70,31 @@ class AmqpBrokerTest(BrokerTest): ] return self.popen(cmd, stdout=PIPE) - def send_and_receive(self, send_config=None, recv_config=None, count=1000, debug=False): + def ready_receiver(self, config): + s = self.broker.connect().session() + r = s.receiver("readyq; {create:always}") + cmd = ["qpid-receive", + "--broker", config.url, + "--address", config.address, + "--connection-options", "{protocol:%r}" % config.version, + "--timeout=10", "--ready-address=readyq;{create:always}" + ] + result = self.popen(cmd, stdout=PIPE) + r.fetch(timeout=1) # wait until receiver is actually ready + s.acknowledge() + s.close() + return result + + def send_and_receive(self, send_config=None, recv_config=None, count=1000, reply_to=None, wait_for_receiver=False, debug=False): if debug: print "sender config is %s" % (send_config or self.default_config) print "receiver config is %s" % (recv_config or self.default_config) - sender = self.sender(send_config or self.default_config) + sender = self.sender(send_config or self.default_config, reply_to) sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112 - receiver = self.receiver(recv_config or self.default_config) + if wait_for_receiver: + receiver = self.ready_receiver(recv_config or self.default_config) + else: + receiver = self.receiver(recv_config or self.default_config) messages = ["message-%s" % (i+1) for i in range(count)] for m in messages: @@ -137,6 +157,60 @@ class AmqpBrokerTest(BrokerTest): def test_translate_empty_2(self): self.send_and_receive_empty(send_config=Config(self.broker, version="amqp0-10")) + def request_response(self, reply_to, send_config=None, request_config=None, response_config=None, count=1000, wait_for_receiver=False): + rconfig = request_config or self.default_config + echo_cmd = ["qpid-receive", + "--broker", rconfig.url, + "--address=%s" % rconfig.address, + "--connection-options={protocol:%s}" % rconfig.version, + "--timeout=10", "--print-content=false", "--print-headers=false" + ] + requests = self.popen(echo_cmd) + self.send_and_receive(send_config, response_config, count, reply_to=reply_to, wait_for_receiver=wait_for_receiver) + requests.wait() + + def request_response_local(self, request_address, response_address, wait_for_receiver=False, request_version="amqp1.0", echo_version="amqp1.0"): + self.request_response(response_address, send_config=Config(self.broker, address=request_address, version=request_version), request_config=Config(self.broker, address=request_address, version=echo_version), response_config=Config(self.broker, address=response_address, version=request_version), wait_for_receiver=wait_for_receiver) + + def test_request_reponse_queue(self): + self.agent.create("queue", "q1") + self.agent.create("queue", "q2") + self.request_response_local("q1", "q2") + + def test_request_reponse_queue_translated1(self): + self.agent.create("queue", "q1") + self.agent.create("queue", "q2") + self.request_response_local("q1", "q2", request_version="amqp0-10", echo_version="amqp1.0") + + def test_request_reponse_queue_translated2(self): + self.agent.create("queue", "q1") + self.agent.create("queue", "q2") + self.request_response_local("q1", "q2", request_version="amqp1.0", echo_version="amqp0-10") + + def test_request_reponse_exchange(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.fanout", wait_for_receiver=True) + + def test_request_reponse_exchange_translated1(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0") + + def test_request_reponse_exchange_translated2(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10") + + def test_request_reponse_exchange_with_subject(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True) + + def test_request_reponse_exchange_with_subject_translated1(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0") + + def test_request_reponse_exchange_with_subject_translated2(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10") + def test_domain(self): brokerB = self.amqp_broker() self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()}) |
