summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-05-28 16:26:54 +0000
committerGordon Sim <gsim@apache.org>2013-05-28 16:26:54 +0000
commitd3f7bac7c1f31c5a52032a5874662b6bf0efd5c2 (patch)
tree686c2ac97c383ba1fabdced56c190de8e1b4ec41 /qpid/cpp/src
parent2af5cb678ce7d1bb434e6862ada8d1b3b8acd02d (diff)
downloadqpid-python-d3f7bac7c1f31c5a52032a5874662b6bf0efd5c2.tar.gz
QPID-4713: fix handling of reply to when converting from 1.0 to 0-10 format
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1486989 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp8
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py82
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp5
6 files changed, 116 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
index 16f0cdc39f..91008d4075 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
@@ -114,7 +114,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const qpid::broker::Message& m)
{
- qpid::broker::amqp::Translation t(m);
+ qpid::broker::amqp::Translation t(m, &broker);
return t.getTransfer();
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index 3191ff0a8a..846deb92f5 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/broker/Broker.h"
#include "qpid/amqp/Decoder.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
@@ -38,10 +39,30 @@ namespace {
const std::string EMPTY;
const std::string FORWARD_SLASH("/");
+qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
+{
+ size_t i = address.find(FORWARD_SLASH);
+ if (i == std::string::npos) {
+ //is it a queue or an exchange?
+ if (broker && broker->getQueues().find(address)) {
+ return qpid::framing::ReplyTo(EMPTY, address);
+ } else if (broker && broker->getExchanges().find(address)) {
+ return qpid::framing::ReplyTo(address, EMPTY);
+ } else {
+ return qpid::framing::ReplyTo();
+ }
+ } else {
+ return qpid::framing::ReplyTo(i > 0 ? address.substr(0, i) : EMPTY, (i+1) < address.size() ? address.substr(i+1) : EMPTY);
+ }
+}
+qpid::framing::ReplyTo translate(const qpid::amqp::CharSequence& address, Broker* broker)
+{
+ return translate(std::string(address.data, address.size), broker);
+}
std::string translate(const qpid::framing::ReplyTo r)
{
- if (r.hasExchange()) {
- if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey();
+ if (r.getExchange().size()) {
+ if (r.getRoutingKey().size()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey();
else return r.getExchange();
} else return r.getRoutingKey();
}
@@ -115,7 +136,7 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
};
}
-Translation::Translation(const qpid::broker::Message& m) : original(m) {}
+Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {}
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
@@ -175,10 +196,7 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
break;
}
- // TODO: ReplyTo - there is no way to reliably determine
- // the type of the node from just its name, unless we
- // query the brokers registries
-
+ if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(), broker));
if (message->getContentType()) props->setContentType(translate(message->getContentType()));
if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
props->setUserId(message->getUserId());
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.h b/qpid/cpp/src/qpid/broker/amqp/Translation.h
index 7591f45b2a..47cb69d0ba 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.h
@@ -25,6 +25,7 @@
namespace qpid {
namespace broker {
+class Broker;
class Message;
namespace amqp_0_10 {
class MessageTransfer;
@@ -38,7 +39,7 @@ class OutgoingFromQueue;
class Translation
{
public:
- Translation(const qpid::broker::Message& message);
+ Translation(const qpid::broker::Message& message, Broker* broker = 0);
/**
* @returns a pointer to an AMQP 0-10 message transfer suitable
@@ -52,6 +53,7 @@ class Translation
void write(OutgoingFromQueue&);
private:
const qpid::broker::Message& original;
+ Broker* broker;
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 0b3de7e680..aeb89d145c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -134,6 +134,7 @@ class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
const qpid::messaging::MessageImpl& msg;
};
const std::string EMPTY;
+const std::string FORWARD_SLASH("/");
class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
{
@@ -185,7 +186,12 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
std::string getReplyTo() const
{
- return msg.getReplyTo().str();
+ Address a = msg.getReplyTo();
+ if (a.getSubject().size()) {
+ return a.getName() + FORWARD_SLASH + a.getSubject();
+ } else {
+ return a.getName();
+ }
}
bool hasCorrelationId() const
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
index 5c63ac9f82..129283ac24 100755
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -50,13 +50,15 @@ class AmqpBrokerTest(BrokerTest):
self.default_config = Config(self.broker)
self.agent = BrokerAgent(self.broker.connect())
- def sender(self, config):
+ def sender(self, config, reply_to=None):
cmd = ["qpid-send",
"--broker", config.url,
"--address", config.address,
"--connection-options", "{protocol:%s}" % config.version,
"--content-stdin", "--send-eos=1"
]
+ if reply_to:
+ cmd.append( "--reply-to=%s" % reply_to)
return self.popen(cmd, stdin=PIPE)
def receiver(self, config):
@@ -68,13 +70,31 @@ class AmqpBrokerTest(BrokerTest):
]
return self.popen(cmd, stdout=PIPE)
- def send_and_receive(self, send_config=None, recv_config=None, count=1000, debug=False):
+ def ready_receiver(self, config):
+ s = self.broker.connect().session()
+ r = s.receiver("readyq; {create:always}")
+ cmd = ["qpid-receive",
+ "--broker", config.url,
+ "--address", config.address,
+ "--connection-options", "{protocol:%r}" % config.version,
+ "--timeout=10", "--ready-address=readyq;{create:always}"
+ ]
+ result = self.popen(cmd, stdout=PIPE)
+ r.fetch(timeout=1) # wait until receiver is actually ready
+ s.acknowledge()
+ s.close()
+ return result
+
+ def send_and_receive(self, send_config=None, recv_config=None, count=1000, reply_to=None, wait_for_receiver=False, debug=False):
if debug:
print "sender config is %s" % (send_config or self.default_config)
print "receiver config is %s" % (recv_config or self.default_config)
- sender = self.sender(send_config or self.default_config)
+ sender = self.sender(send_config or self.default_config, reply_to)
sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
- receiver = self.receiver(recv_config or self.default_config)
+ if wait_for_receiver:
+ receiver = self.ready_receiver(recv_config or self.default_config)
+ else:
+ receiver = self.receiver(recv_config or self.default_config)
messages = ["message-%s" % (i+1) for i in range(count)]
for m in messages:
@@ -137,6 +157,60 @@ class AmqpBrokerTest(BrokerTest):
def test_translate_empty_2(self):
self.send_and_receive_empty(send_config=Config(self.broker, version="amqp0-10"))
+ def request_response(self, reply_to, send_config=None, request_config=None, response_config=None, count=1000, wait_for_receiver=False):
+ rconfig = request_config or self.default_config
+ echo_cmd = ["qpid-receive",
+ "--broker", rconfig.url,
+ "--address=%s" % rconfig.address,
+ "--connection-options={protocol:%s}" % rconfig.version,
+ "--timeout=10", "--print-content=false", "--print-headers=false"
+ ]
+ requests = self.popen(echo_cmd)
+ self.send_and_receive(send_config, response_config, count, reply_to=reply_to, wait_for_receiver=wait_for_receiver)
+ requests.wait()
+
+ def request_response_local(self, request_address, response_address, wait_for_receiver=False, request_version="amqp1.0", echo_version="amqp1.0"):
+ self.request_response(response_address, send_config=Config(self.broker, address=request_address, version=request_version), request_config=Config(self.broker, address=request_address, version=echo_version), response_config=Config(self.broker, address=response_address, version=request_version), wait_for_receiver=wait_for_receiver)
+
+ def test_request_reponse_queue(self):
+ self.agent.create("queue", "q1")
+ self.agent.create("queue", "q2")
+ self.request_response_local("q1", "q2")
+
+ def test_request_reponse_queue_translated1(self):
+ self.agent.create("queue", "q1")
+ self.agent.create("queue", "q2")
+ self.request_response_local("q1", "q2", request_version="amqp0-10", echo_version="amqp1.0")
+
+ def test_request_reponse_queue_translated2(self):
+ self.agent.create("queue", "q1")
+ self.agent.create("queue", "q2")
+ self.request_response_local("q1", "q2", request_version="amqp1.0", echo_version="amqp0-10")
+
+ def test_request_reponse_exchange(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.fanout", wait_for_receiver=True)
+
+ def test_request_reponse_exchange_translated1(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0")
+
+ def test_request_reponse_exchange_translated2(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10")
+
+ def test_request_reponse_exchange_with_subject(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True)
+
+ def test_request_reponse_exchange_with_subject_translated1(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp0-10", echo_version="amqp1.0")
+
+ def test_request_reponse_exchange_with_subject_translated2(self):
+ self.agent.create("queue", "q1")
+ self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True, request_version="amqp1.0", echo_version="amqp0-10")
+
def test_domain(self):
brokerB = self.amqp_broker()
self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()})
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index d5f498acbd..510e9be42c 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -250,10 +250,9 @@ int main(int argc, char ** argv)
if (s.isNull()) {
s = session.createSender(msg.getReplyTo());
s.setCapacity(opts.capacity);
+ replyTo[msg.getReplyTo().str()] = s;
}
- if (!opts.replyto.empty()) {
- msg.setReplyTo(Address(opts.replyto));
- }
+ msg.setReplyTo(Address(opts.replyto));
s.send(msg);
}
if (opts.receiveRate) {