diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-13 15:06:54 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-13 15:06:54 +0000 |
| commit | d30253ae61bb81090ba43b055094dbe5a6d7c98d (patch) | |
| tree | b63d2b8f0277a8297937e6cac6a35b9e49fc9738 /qpid/cpp | |
| parent | 144f3c698bdddf22509691a4f285305e9fd83291 (diff) | |
| download | qpid-python-d30253ae61bb81090ba43b055094dbe5a6d7c98d.tar.gz | |
QPID-4711: translate between structured content in AMQP 0-10 and 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1513537 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/bindings/qpid/python/python.i | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.cpp | 109 |
2 files changed, 95 insertions, 21 deletions
diff --git a/qpid/cpp/bindings/qpid/python/python.i b/qpid/cpp/bindings/qpid/python/python.i index c10ea46000..9158836a2b 100644 --- a/qpid/cpp/bindings/qpid/python/python.i +++ b/qpid/cpp/bindings/qpid/python/python.i @@ -351,6 +351,9 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) self.setProperty(k, v) def _get_content(self) : + obj = self.getContentObject() + if obj: + return obj if self.content_type == "amqp/list" : return decodeList(self) if self.content_type == "amqp/map" : @@ -365,9 +368,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) elif isinstance(content, list) or isinstance(content, dict) : encode(content, self) else : - # Not a type we can handle. Try setting it anyway, - # although this will probably lead to a swig error - self.setContent(str(content)) + self.setContentObject(content) __swig_getmethods__["content"] = _get_content __swig_setmethods__["content"] = _set_content if _newclass: content = property(_get_content, _set_content) diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index e04d44d2c8..188738287e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -27,6 +27,7 @@ #include "qpid/amqp/MessageEncoder.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/types/Variant.h" +#include "qpid/types/encodings.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include <boost/lexical_cast.hpp> @@ -38,6 +39,8 @@ namespace { const std::string EMPTY; const std::string FORWARD_SLASH("/"); +const std::string TEXT_PLAIN("text/plain"); +const std::string SUBJECT_KEY("qpid.subject"); qpid::framing::ReplyTo translate(const std::string address, Broker* broker) { @@ -98,8 +101,25 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; } bool hasTo() const { return getDestination().size() || hasSubject(); } std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); } - bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); } - std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; } + bool hasSubject() const + { + if (getDestination().empty()) { + return getApplicationProperties().isSet(SUBJECT_KEY); + } else { + return deliveryProperties && deliveryProperties->hasRoutingKey(); + } + } + std::string getSubject() const + { + if (getDestination().empty()) { + //message was sent to default exchange, routing key is the queue name + return getApplicationProperties().getAsString(SUBJECT_KEY); + } else if (deliveryProperties) { + return deliveryProperties->getRoutingKey(); + } else { + return EMPTY; + } + } bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); } std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; } bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); } @@ -119,7 +139,7 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties bool hasReplyToGroupId() const { return false; } std::string getReplyToGroupId() const { return EMPTY; } - const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); } + const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); } Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t), messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()), deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>()) @@ -138,7 +158,6 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {} - boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer() { boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t = @@ -161,13 +180,38 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation transfer->getFrames().append(method); transfer->getFrames().append(header); - qpid::amqp::CharSequence body = message->getBody(); - content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size); - transfer->getFrames().append(content); - qpid::framing::MessageProperties* props = transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true); - props->setContentLength(body.size); + + if (message->isTypedBody()) { + qpid::types::Variant body = message->getTypedBody(); + std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData(); + if (body.getType() == qpid::types::VAR_MAP) { + qpid::amqp_0_10::MapCodec::encode(body.asMap(), data); + props->setContentType(qpid::amqp_0_10::MapCodec::contentType); + } else if (body.getType() == qpid::types::VAR_LIST) { + qpid::amqp_0_10::ListCodec::encode(body.asList(), data); + props->setContentType(qpid::amqp_0_10::ListCodec::contentType); + } else if (body.getType() == qpid::types::VAR_STRING) { + data = body.getString(); + if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) { + props->setContentType(TEXT_PLAIN); + } + } else { + qpid::types::Variant::List container; + container.push_back(body); + qpid::amqp_0_10::ListCodec::encode(container, data); + props->setContentType(qpid::amqp_0_10::ListCodec::contentType); + } + transfer->getFrames().append(content); + props->setContentLength(data.size()); + } else { + qpid::amqp::CharSequence body = message->getBody(); + content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size); + transfer->getFrames().append(content); + + props->setContentLength(body.size); + } qpid::amqp::MessageId mid = message->getMessageId(); qpid::framing::Uuid uuid; @@ -215,7 +259,10 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true); dp->setPriority(message->getPriority()); if (message->isPersistent()) dp->setDeliveryMode(2); - if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey()); + if (message->getRoutingKey().size()) { + dp->setRoutingKey(message->getRoutingKey()); + props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey()); + } return transfer.get(); } else { @@ -246,14 +293,40 @@ void Translation::write(OutgoingFromQueue& out) Properties_0_10 properties(*transfer); qpid::types::Variant::Map applicationProperties; qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties); - std::string content = transfer->getContent(); - size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); - std::vector<char> buffer(size); - qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); - encoder.writeProperties(properties); - encoder.writeApplicationProperties(applicationProperties); - if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA); - out.write(&buffer[0], encoder.getPosition()); + if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) { + qpid::types::Variant::Map content; + qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); + size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ + size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE); + out.write(&buffer[0], encoder.getPosition()); + } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) { + qpid::types::Variant::List content; + qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); + size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ + size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE); + out.write(&buffer[0], encoder.getPosition()); + } else { + std::string content = transfer->getContent(); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA); + out.write(&buffer[0], encoder.getPosition()); + } } else { QPID_LOG(error, "Could not write message data in AMQP 1.0 format"); } |
