diff options
| author | Gordon Sim <gsim@apache.org> | 2013-05-28 16:26:54 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-05-28 16:26:54 +0000 |
| commit | d3f7bac7c1f31c5a52032a5874662b6bf0efd5c2 (patch) | |
| tree | 686c2ac97c383ba1fabdced56c190de8e1b4ec41 /qpid/cpp/src | |
| parent | 2af5cb678ce7d1bb434e6862ada8d1b3b8acd02d (diff) | |
| download | qpid-python-d3f7bac7c1f31c5a52032a5874662b6bf0efd5c2.tar.gz | |
QPID-4713: fix handling of reply to when converting from 1.0 to 0-10 format
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1486989 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.cpp | 32 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 8 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/interlink_tests.py | 82 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 5 |
6 files changed, 116 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index 16f0cdc39f..91008d4075 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -114,7 +114,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const qpid::broker::Message& m) { - qpid::broker::amqp::Translation t(m); + qpid::broker::amqp::Translation t(m, &broker); return t.getTransfer(); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index 3191ff0a8a..846deb92f5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/broker/Broker.h" #include "qpid/amqp/Decoder.h" #include "qpid/amqp/descriptors.h" #include "qpid/amqp/MessageEncoder.h" @@ -38,10 +39,30 @@ namespace { const std::string EMPTY; const std::string FORWARD_SLASH("/"); +qpid::framing::ReplyTo translate(const std::string address, Broker* broker) +{ + size_t i = address.find(FORWARD_SLASH); + if (i == std::string::npos) { + //is it a queue or an exchange? + if (broker && broker->getQueues().find(address)) { + return qpid::framing::ReplyTo(EMPTY, address); + } else if (broker && broker->getExchanges().find(address)) { + return qpid::framing::ReplyTo(address, EMPTY); + } else { + return qpid::framing::ReplyTo(); + } + } else { + return qpid::framing::ReplyTo(i > 0 ? address.substr(0, i) : EMPTY, (i+1) < address.size() ? address.substr(i+1) : EMPTY); + } +} +qpid::framing::ReplyTo translate(const qpid::amqp::CharSequence& address, Broker* broker) +{ + return translate(std::string(address.data, address.size), broker); +} std::string translate(const qpid::framing::ReplyTo r) { - if (r.hasExchange()) { - if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey(); + if (r.getExchange().size()) { + if (r.getRoutingKey().size()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey(); else return r.getExchange(); } else return r.getRoutingKey(); } @@ -115,7 +136,7 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties }; } -Translation::Translation(const qpid::broker::Message& m) : original(m) {} +Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {} boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer() @@ -175,10 +196,7 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong)); break; } - // TODO: ReplyTo - there is no way to reliably determine - // the type of the node from just its name, unless we - // query the brokers registries - + if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(), broker)); if (message->getContentType()) props->setContentType(translate(message->getContentType())); if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding())); props->setUserId(message->getUserId()); diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.h b/qpid/cpp/src/qpid/broker/amqp/Translation.h index 7591f45b2a..47cb69d0ba 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.h +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.h @@ -25,6 +25,7 @@ namespace qpid { namespace broker { +class Broker; class Message; namespace amqp_0_10 { class MessageTransfer; @@ -38,7 +39,7 @@ class OutgoingFromQueue; class Translation { public: - Translation(const qpid::broker::Message& message); + Translation(const qpid::broker::Message& message, Broker* broker = 0); /** * @returns a pointer to an AMQP 0-10 message transfer suitable @@ -52,6 +53,7 @@ class Translation void write(OutgoingFromQueue&); private: const qpid::broker::Message& original; + Broker* broker; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 0b3de7e680..aeb89d145c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -134,6 +134,7 @@ class HeaderAdapter : public qpid::amqp::MessageEncoder::Header const qpid::messaging::MessageImpl& msg; }; const std::string EMPTY; +const std::string FORWARD_SLASH("/"); class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties { @@ -185,7 +186,12 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties std::string getReplyTo() const { - return msg.getReplyTo().str(); + Address a = msg.getReplyTo(); + if (a.getSubject().size()) { + return a.getName() + FORWARD_SLASH + a.getSubject(); + } else { + return a.getName(); + } } bool hasCorrelationId() const diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py index 5c63ac9f82..129283ac24 100755 --- a/qpid/cpp/src/tests/interlink_tests.py +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -50,13 +50,15 @@ class AmqpBrokerTest(BrokerTest): self.default_config = Config(self.broker) self.agent = BrokerAgent(self.broker.connect()) - def sender(self, config): + def sender(self, config, reply_to=None): cmd = ["qpid-send", "--broker", config.url, "--address", config.address, "--connection-options", "{protocol:%s}" % config.version, "--content-stdin", "--send-eos=1" ] + if reply_to: + cmd.append( "--reply-to=%s" % reply_to) return self.popen(cmd, stdin=PIPE) def receiver(self, config): @@ -68,13 +70,31 @@ class AmqpBrokerTest(BrokerTest): ] return self.popen(cmd, stdout=PIPE) - def send_and_receive(self, send_config=None, recv_config=None, count=1000, debug=False): + def ready_receiver(self, config): + s = self.broker.connect().session() + r = s.receiver("readyq; {create:always}") + cmd = ["qpid-receive", + "--broker", config.url, + "--address", config.address, + "--connection-options", "{protocol:%r}" % config.version, + "--timeout=10", "--ready-address=readyq;{create:always}" + ] + result = self.popen(cmd, stdout=PIPE) + r.fetch(timeout=1) # wait until receiver is actually ready + s.acknowledge() + s.close() + return result + + def send_and_receive(self, send_config=None, recv_config=None, count=1000, reply_to=None, wait_for_receiver=False, debug=False): if debug: print "sender config is %s" % (send_config or self.default_config) print "receiver config is %s" % (recv_config or self.default_config) - sender = self.sender(send_config or self.default_config) + sender = self.sender(send_config or self.default_config, reply_to) sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112 - receiver = self.receiver(recv_config or self.default_config) + if wait_for_receiver: + receiver = self.ready_receiver(recv_config or self.default_config) + else: + receiver = self.receiver(recv_config or self.default_config) messages = ["message-%s" % (i+1) for i in range(count)] for m in messages: @@ -137,6 +157,60 @@ class AmqpBrokerTest(BrokerTest): def test_translate_empty_2(self): self.send_and_receive_empty(send_config=Config(self.broker, version="amqp0-10")) + def request_response(self, reply_to, send_config=None, request_config=None, response_config=None, count=1000, wait_for_receiver=False): + rconfig = request_config or self.default_config + echo_cmd = ["qpid-receive", + "--broker", rconfig.url, + "--address=%s" % rconfig.address, + "--connection-options={protocol:%s}" % rconfig.version, + "--timeout=10", "--print-content=false", "--print-headers=false" + ] + requests = self.popen(echo_cmd) + self.send_and_receive(send_config, response_config, count, reply_to=reply_to, wait_for_receiver=wait_for_receiver) + requests.wait() + + def request_response_local(self, request_address, response_address, wait_for_receiver=False, request_version="amqp1.0", echo_version="amqp1.0"): + self.request_response(response_address, send_config=Config(self.broker, address=request_address, version=request_version), request_config=Config(self.broker, address=request_address, version=echo_version), response_config=Config(self.broker, address=response_address, version=request_version), wait_for_receiver=wait_for_receiver) + + def test_request_reponse_queue(self): + self.agent.create("queue", "q1") + self.agent.create("queue", "q2") + self.request_response_local("q1", "q2") + + def test_request_reponse_queue_translated1(self): + self.agent.create("queue", "q1") + self.agent.create("queue", "q2") + self.request_response_local("q1", "q2", request_version="amqp0-10", echo_version="amqp1.0") + + def test_request_reponse_queue_translated2(self): + self.agent.create("queue", "q1") + self.agent.create("queue", "q2") + self.request_response_local("q1", "q2", request_version="amqp1.0", echo_version="amqp0-10") + + def test_request_reponse_exchange(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.fanout", wait_for_receiver=True) + + def test_request_reponse_exchange_translated1(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0") + + def test_request_reponse_exchange_translated2(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10") + + def test_request_reponse_exchange_with_subject(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True) + + def test_request_reponse_exchange_with_subject_translated1(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0") + + def test_request_reponse_exchange_with_subject_translated2(self): + self.agent.create("queue", "q1") + self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10") + def test_domain(self): brokerB = self.amqp_broker() self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()}) diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index d5f498acbd..510e9be42c 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -250,10 +250,9 @@ int main(int argc, char ** argv) if (s.isNull()) { s = session.createSender(msg.getReplyTo()); s.setCapacity(opts.capacity); + replyTo[msg.getReplyTo().str()] = s; } - if (!opts.replyto.empty()) { - msg.setReplyTo(Address(opts.replyto)); - } + msg.setReplyTo(Address(opts.replyto)); s.send(msg); } if (opts.receiveRate) { |
