summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-05-28 16:26:54 +0000
committerGordon Sim <gsim@apache.org>2013-05-28 16:26:54 +0000
commitd3f7bac7c1f31c5a52032a5874662b6bf0efd5c2 (patch)
tree686c2ac97c383ba1fabdced56c190de8e1b4ec41 /qpid/cpp/src/tests
parent2af5cb678ce7d1bb434e6862ada8d1b3b8acd02d (diff)
downloadqpid-python-d3f7bac7c1f31c5a52032a5874662b6bf0efd5c2.tar.gz
QPID-4713: fix handling of reply to when converting from 1.0 to 0-10 format
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1486989 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py82
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp5
2 files changed, 80 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
index 5c63ac9f82..129283ac24 100755
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -50,13 +50,15 @@ class AmqpBrokerTest(BrokerTest):
self.default_config = Config(self.broker)
self.agent = BrokerAgent(self.broker.connect())
- def sender(self, config):
+ def sender(self, config, reply_to=None):
cmd = ["qpid-send",
"--broker", config.url,
"--address", config.address,
"--connection-options", "{protocol:%s}" % config.version,
"--content-stdin", "--send-eos=1"
]
+ if reply_to:
+ cmd.append( "--reply-to=%s" % reply_to)
return self.popen(cmd, stdin=PIPE)
def receiver(self, config):
@@ -68,13 +70,31 @@ class AmqpBrokerTest(BrokerTest):
]
return self.popen(cmd, stdout=PIPE)
- def send_and_receive(self, send_config=None, recv_config=None, count=1000, debug=False):
+ def ready_receiver(self, config):
+ s = self.broker.connect().session()
+ r = s.receiver("readyq; {create:always}")
+ cmd = ["qpid-receive",
+ "--broker", config.url,
+ "--address", config.address,
+ "--connection-options", "{protocol:%r}" % config.version,
+ "--timeout=10", "--ready-address=readyq;{create:always}"
+ ]
+ result = self.popen(cmd, stdout=PIPE)
+ r.fetch(timeout=1) # wait until receiver is actually ready
+ s.acknowledge()
+ s.close()
+ return result
+
+ def send_and_receive(self, send_config=None, recv_config=None, count=1000, reply_to=None, wait_for_receiver=False, debug=False):
if debug:
print "sender config is %s" % (send_config or self.default_config)
print "receiver config is %s" % (recv_config or self.default_config)
- sender = self.sender(send_config or self.default_config)
+ sender = self.sender(send_config or self.default_config, reply_to)
sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
- receiver = self.receiver(recv_config or self.default_config)
+ if wait_for_receiver:
+ receiver = self.ready_receiver(recv_config or self.default_config)
+ else:
+ receiver = self.receiver(recv_config or self.default_config)
messages = ["message-%s" % (i+1) for i in range(count)]
for m in messages:
@@ -137,6 +157,60 @@ class AmqpBrokerTest(BrokerTest):
def test_translate_empty_2(self):
self.send_and_receive_empty(send_config=Config(self.broker, version="amqp0-10"))
+ def request_response(self, reply_to, send_config=None, request_config=None, response_config=None, count=1000, wait_for_receiver=False):
+ rconfig = request_config or self.default_config
+ echo_cmd = ["qpid-receive",
+ "--broker", rconfig.url,
+ "--address=%s" % rconfig.address,
+ "--connection-options={protocol:%s}" % rconfig.version,
+ "--timeout=10", "--print-content=false", "--print-headers=false"
+ ]
+ requests = self.popen(echo_cmd)
+ self.send_and_receive(send_config, response_config, count, reply_to=reply_to, wait_for_receiver=wait_for_receiver)
+ requests.wait()
+
+ def request_response_local(self, request_address, response_address, wait_for_receiver=False, request_version="amqp1.0", echo_version="amqp1.0"):
+ self.request_response(response_address, send_config=Config(self.broker, address=request_address, version=request_version), request_config=Config(self.broker, address=request_address, version=echo_version), response_config=Config(self.broker, address=response_address, version=request_version), wait_for_receiver=wait_for_receiver)
+
+ def test_request_reponse_queue(self):
+ self.agent.create("queue", "q1")
+ self.agent.create("queue", "q2")
+ self.request_response_local("q1", "q2")
+
+ def test_request_reponse_queue_translated1(self):
+ self.agent.create("queue", "q1")
+ self.agent.create("queue", "q2")
+ self.request_response_local("q1", "q2", request_version="amqp0-10", echo_version="amqp1.0")
+
+ def test_request_reponse_queue_translated2(self):
+ self.agent.create("queue", "q1")
+ self.agent.create("queue", "q2")
+ self.request_response_local("q1", "q2", request_version="amqp1.0", echo_version="amqp0-10")
+
+ def test_request_reponse_exchange(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.fanout", wait_for_receiver=True)
+
+ def test_request_reponse_exchange_translated1(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0")
+
+ def test_request_reponse_exchange_translated2(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10")
+
+ def test_request_reponse_exchange_with_subject(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True)
+
+ def test_request_reponse_exchange_with_subject_translated1(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0")
+
+ def test_request_reponse_exchange_with_subject_translated2(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10")
+
def test_domain(self):
brokerB = self.amqp_broker()
self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()})
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index d5f498acbd..510e9be42c 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -250,10 +250,9 @@ int main(int argc, char ** argv)
if (s.isNull()) {
s = session.createSender(msg.getReplyTo());
s.setCapacity(opts.capacity);
+ replyTo[msg.getReplyTo().str()] = s;
}
- if (!opts.replyto.empty()) {
- msg.setReplyTo(Address(opts.replyto));
- }
+ msg.setReplyTo(Address(opts.replyto));
s.send(msg);
}
if (opts.receiveRate) {