summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-13 15:06:54 +0000
committerGordon Sim <gsim@apache.org>2013-08-13 15:06:54 +0000
commitd30253ae61bb81090ba43b055094dbe5a6d7c98d (patch)
treeb63d2b8f0277a8297937e6cac6a35b9e49fc9738 /qpid/cpp
parent144f3c698bdddf22509691a4f285305e9fd83291 (diff)
downloadqpid-python-d30253ae61bb81090ba43b055094dbe5a6d7c98d.tar.gz
QPID-4711: translate between structured content in AMQP 0-10 and 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1513537 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/bindings/qpid/python/python.i7
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp109
2 files changed, 95 insertions, 21 deletions
diff --git a/qpid/cpp/bindings/qpid/python/python.i b/qpid/cpp/bindings/qpid/python/python.i
index c10ea46000..9158836a2b 100644
--- a/qpid/cpp/bindings/qpid/python/python.i
+++ b/qpid/cpp/bindings/qpid/python/python.i
@@ -351,6 +351,9 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
self.setProperty(k, v)
def _get_content(self) :
+ obj = self.getContentObject()
+ if obj:
+ return obj
if self.content_type == "amqp/list" :
return decodeList(self)
if self.content_type == "amqp/map" :
@@ -365,9 +368,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
elif isinstance(content, list) or isinstance(content, dict) :
encode(content, self)
else :
- # Not a type we can handle. Try setting it anyway,
- # although this will probably lead to a swig error
- self.setContent(str(content))
+ self.setContentObject(content)
__swig_getmethods__["content"] = _get_content
__swig_setmethods__["content"] = _set_content
if _newclass: content = property(_get_content, _set_content)
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index e04d44d2c8..188738287e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -27,6 +27,7 @@
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/types/Variant.h"
+#include "qpid/types/encodings.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <boost/lexical_cast.hpp>
@@ -38,6 +39,8 @@ namespace {
const std::string EMPTY;
const std::string FORWARD_SLASH("/");
+const std::string TEXT_PLAIN("text/plain");
+const std::string SUBJECT_KEY("qpid.subject");
qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
{
@@ -98,8 +101,25 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; }
bool hasTo() const { return getDestination().size() || hasSubject(); }
std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); }
- bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); }
- std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; }
+ bool hasSubject() const
+ {
+ if (getDestination().empty()) {
+ return getApplicationProperties().isSet(SUBJECT_KEY);
+ } else {
+ return deliveryProperties && deliveryProperties->hasRoutingKey();
+ }
+ }
+ std::string getSubject() const
+ {
+ if (getDestination().empty()) {
+ //message was sent to default exchange, routing key is the queue name
+ return getApplicationProperties().getAsString(SUBJECT_KEY);
+ } else if (deliveryProperties) {
+ return deliveryProperties->getRoutingKey();
+ } else {
+ return EMPTY;
+ }
+ }
bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); }
std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; }
bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); }
@@ -119,7 +139,7 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
bool hasReplyToGroupId() const { return false; }
std::string getReplyToGroupId() const { return EMPTY; }
- const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); }
+ const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); }
Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
@@ -138,7 +158,6 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {}
-
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
{
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
@@ -161,13 +180,38 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
transfer->getFrames().append(method);
transfer->getFrames().append(header);
- qpid::amqp::CharSequence body = message->getBody();
- content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
- transfer->getFrames().append(content);
-
qpid::framing::MessageProperties* props =
transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
- props->setContentLength(body.size);
+
+ if (message->isTypedBody()) {
+ qpid::types::Variant body = message->getTypedBody();
+ std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData();
+ if (body.getType() == qpid::types::VAR_MAP) {
+ qpid::amqp_0_10::MapCodec::encode(body.asMap(), data);
+ props->setContentType(qpid::amqp_0_10::MapCodec::contentType);
+ } else if (body.getType() == qpid::types::VAR_LIST) {
+ qpid::amqp_0_10::ListCodec::encode(body.asList(), data);
+ props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ } else if (body.getType() == qpid::types::VAR_STRING) {
+ data = body.getString();
+ if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) {
+ props->setContentType(TEXT_PLAIN);
+ }
+ } else {
+ qpid::types::Variant::List container;
+ container.push_back(body);
+ qpid::amqp_0_10::ListCodec::encode(container, data);
+ props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ }
+ transfer->getFrames().append(content);
+ props->setContentLength(data.size());
+ } else {
+ qpid::amqp::CharSequence body = message->getBody();
+ content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
+ transfer->getFrames().append(content);
+
+ props->setContentLength(body.size);
+ }
qpid::amqp::MessageId mid = message->getMessageId();
qpid::framing::Uuid uuid;
@@ -215,7 +259,10 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
dp->setPriority(message->getPriority());
if (message->isPersistent()) dp->setDeliveryMode(2);
- if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
+ if (message->getRoutingKey().size()) {
+ dp->setRoutingKey(message->getRoutingKey());
+ props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey());
+ }
return transfer.get();
} else {
@@ -246,14 +293,40 @@ void Translation::write(OutgoingFromQueue& out)
Properties_0_10 properties(*transfer);
qpid::types::Variant::Map applicationProperties;
qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
- std::string content = transfer->getContent();
- size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
- std::vector<char> buffer(size);
- qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
- encoder.writeProperties(properties);
- encoder.writeApplicationProperties(applicationProperties);
- if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
- out.write(&buffer[0], encoder.getPosition());
+ if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+ qpid::types::Variant::Map content;
+ qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content);
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+ size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE);
+ out.write(&buffer[0], encoder.getPosition());
+ } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+ qpid::types::Variant::List content;
+ qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content);
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+ size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE);
+ out.write(&buffer[0], encoder.getPosition());
+ } else {
+ std::string content = transfer->getContent();
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
+ out.write(&buffer[0], encoder.getPosition());
+ }
} else {
QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
}