diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/amqp/Translation.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/amqp/Translation.cpp')
-rw-r--r-- | cpp/src/qpid/broker/amqp/Translation.cpp | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/amqp/Translation.cpp b/cpp/src/qpid/broker/amqp/Translation.cpp new file mode 100644 index 0000000000..ca2094b965 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Translation.cpp @@ -0,0 +1,241 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Translation.h" +#include "qpid/broker/amqp/Outgoing.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/types/Variant.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace broker { +namespace amqp { +namespace { + +const std::string EMPTY; +const std::string FORWARD_SLASH("/"); + +std::string translate(const qpid::framing::ReplyTo r) +{ + if (r.hasExchange()) { + if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey(); + else return r.getExchange(); + } else return r.getRoutingKey(); +} +std::string translate(const qpid::amqp::CharSequence& chars) +{ + if (chars.data && chars.size) return std::string(chars.data, chars.size); + else return EMPTY; +} +bool setMessageId(qpid::framing::MessageProperties& m, const qpid::amqp::CharSequence& chars) +{ + if (chars.data && chars.size) { + if (chars.size == 16) { + m.setMessageId(qpid::framing::Uuid(chars.data)); + return true; + } else { + std::istringstream in(translate(chars)); + qpid::framing::Uuid uuid; + in >> uuid; + if (!in.fail()) { + m.setMessageId(uuid); + return true; + } + } + } + return false; +} +class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties +{ + public: + bool hasMessageId() const { return messageProperties && messageProperties->hasMessageId(); } + std::string getMessageId() const { return messageProperties ? messageProperties->getMessageId().str() : EMPTY; } + bool hasUserId() const { return messageProperties && messageProperties->hasUserId(); } + std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; } + bool hasTo() const { return getDestination().size() || hasSubject(); } + std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); } + bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); } + std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; } + bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); } + std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; } + bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); } + std::string getCorrelationId() const { return messageProperties ? messageProperties->getCorrelationId() : EMPTY; } + bool hasContentType() const { return messageProperties && messageProperties->hasContentType(); } + std::string getContentType() const { return messageProperties ? messageProperties->getContentType() : EMPTY; } + bool hasContentEncoding() const { return messageProperties && messageProperties->hasContentEncoding(); } + std::string getContentEncoding() const { return messageProperties ? messageProperties->getContentEncoding() : EMPTY; } + bool hasAbsoluteExpiryTime() const { return deliveryProperties && deliveryProperties->hasExpiration(); } + int64_t getAbsoluteExpiryTime() const { return deliveryProperties ? deliveryProperties->getExpiration() : 0; } + bool hasCreationTime() const { return false; } + int64_t getCreationTime() const { return 0; } + bool hasGroupId() const {return false; } + std::string getGroupId() const { return EMPTY; } + bool hasGroupSequence() const { return false; } + uint32_t getGroupSequence() const { return 0; } + bool hasReplyToGroupId() const { return false; } + std::string getReplyToGroupId() const { return EMPTY; } + + const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); } + Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t), + messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()), + deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>()) + {} + private: + const qpid::broker::amqp_0_10::MessageTransfer& transfer; + const qpid::framing::MessageProperties* messageProperties; + const qpid::framing::DeliveryProperties* deliveryProperties; + + std::string getDestination() const + { + return transfer.getMethod<qpid::framing::MessageTransferBody>()->getDestination(); + } +}; +} + +Translation::Translation(const qpid::broker::Message& m) : original(m) {} + + +boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer() +{ + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t = + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding())); + if (t) { + return t;//no translation required + } else { + const Message* message = dynamic_cast<const Message*>(&original.getEncoding()); + if (message) { + //translate 1.0 message into 0-10 + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0))); + qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody())); + qpid::framing::AMQFrame content((qpid::framing::AMQContentBody())); + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + transfer->getFrames().append(method); + transfer->getFrames().append(header); + + qpid::amqp::CharSequence body = message->getBody(); + content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size); + transfer->getFrames().append(content); + + qpid::framing::MessageProperties* props = + transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true); + props->setContentLength(body.size); + + qpid::amqp::MessageId mid = message->getMessageId(); + qpid::framing::Uuid uuid; + switch (mid.type) { + case qpid::amqp::MessageId::UUID: + case qpid::amqp::MessageId::BYTES: + if (mid.value.bytes.size == 0) break; + if (setMessageId(*props, mid.value.bytes)) break; + case qpid::amqp::MessageId::ULONG: + QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID"); + break; + } + + qpid::amqp::MessageId cid = message->getCorrelationId(); + switch (cid.type) { + case qpid::amqp::MessageId::UUID: + assert(cid.value.bytes.size = 16); + props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str()); + break; + case qpid::amqp::MessageId::BYTES: + if (cid.value.bytes.size) { + props->setCorrelationId(translate(cid.value.bytes)); + } + break; + case qpid::amqp::MessageId::ULONG: + props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong)); + break; + } + // TODO: ReplyTo - there is no way to reliably determine + // the type of the node from just its name, unless we + // query the brokers registries + + if (message->getContentType()) props->setContentType(translate(message->getContentType())); + if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding())); + props->setUserId(message->getUserId()); + // TODO: FieldTable applicationHeaders; + qpid::amqp::CharSequence ap = message->getApplicationProperties(); + if (ap) { + qpid::amqp::Decoder d(ap.data, ap.size); + qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders()); + } + + qpid::framing::DeliveryProperties* dp = + transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true); + dp->setPriority(message->getPriority()); + if (message->isPersistent()) dp->setDeliveryMode(2); + if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey()); + + return transfer.get(); + } else { + throw qpid::Exception("Could not write message data in AMQP 0-10 format"); + } + } +} + +void Translation::write(Outgoing& out) +{ + const Message* message = dynamic_cast<const Message*>(&original.getEncoding()); + if (message) { + //write annotations + //TODO: merge in any newly added annotations + qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations(); + qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations(); + if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size); + if (messageAnnotations.size) out.write(messageAnnotations.data, messageAnnotations.size); + //write bare message + qpid::amqp::CharSequence bareMessage = message->getBareMessage(); + if (bareMessage.size) out.write(bareMessage.data, bareMessage.size); + //write footer: + qpid::amqp::CharSequence footer = message->getFooter(); + if (footer.size) out.write(footer.data, footer.size); + } else { + const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()); + if (transfer) { + Properties_0_10 properties(*transfer); + qpid::types::Variant::Map applicationProperties; + qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties); + std::string content = transfer->getContent(); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeBinary(content, &qpid::amqp::message::DATA); + out.write(&buffer[0], encoder.getPosition()); + } else { + QPID_LOG(error, "Could not write message data in AMQP 1.0 format"); + } + } +} + +}}} // namespace qpid::broker::amqp |