diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/messaging/amqp/EncodedMessage.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/EncodedMessage.cpp')
-rw-r--r-- | cpp/src/qpid/messaging/amqp/EncodedMessage.cpp | 263 |
1 files changed, 263 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp new file mode 100644 index 0000000000..54de3eae45 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/EncodedMessage.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/amqp/Decoder.h" +#include <boost/lexical_cast.hpp> +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { + +using namespace qpid::amqp; + +EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0) +{ + init(); +} + +EncodedMessage::EncodedMessage() : size(0), data(0) +{ + init(); +} + +EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0) +{ + init(); +} + +void EncodedMessage::init() +{ + //init all CharSequence members + deliveryAnnotations.init(); + messageAnnotations.init(); + userId.init(); + to.init(); + subject.init(); + replyTo.init(); + contentType.init(); + contentEncoding.init(); + groupId.init(); + replyToGroupId.init(); + applicationProperties.init(); + body.init(); + footer.init(); +} + +EncodedMessage::~EncodedMessage() +{ + delete[] data; +} + +size_t EncodedMessage::getSize() const +{ + return size; +} +void EncodedMessage::trim(size_t t) +{ + size = t; +} +void EncodedMessage::resize(size_t s) +{ + delete[] data; + size = s; + data = new char[size]; +} + +char* EncodedMessage::getData() +{ + return data; +} +const char* EncodedMessage::getData() const +{ + return data; +} + +void EncodedMessage::init(qpid::messaging::MessageImpl& impl) +{ + //initial scan of raw data + qpid::amqp::Decoder decoder(data, size); + InitialScan reader(*this, impl); + decoder.read(reader); + bareMessage = reader.getBareMessage(); + if (bareMessage.data && !bareMessage.size) { + bareMessage.size = (data + size) - bareMessage.data; + } + +} +void EncodedMessage::populate(qpid::types::Variant::Map& map) const +{ + //decode application properties + if (applicationProperties) { + qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size); + decoder.readMap(map); + } + //add in 'x-amqp-' prefixed values + if (!!firstAcquirer) { + map["x-amqp-first-acquirer"] = firstAcquirer.get(); + } + if (!!deliveryCount) { + map["x-amqp-delivery-count"] = deliveryCount.get(); + } + if (to) { + map["x-amqp-delivery-count"] = to.str(); + } + if (!!absoluteExpiryTime) { + map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get(); + } + if (!!creationTime) { + map["x-amqp-creation-time"] = creationTime.get(); + } + if (groupId) { + map["x-amqp-group-id"] = groupId.str(); + } + if (!!groupSequence) { + map["x-amqp-qroup-sequence"] = groupSequence.get(); + } + if (replyToGroupId) { + map["x-amqp-reply-to-group-id"] = replyToGroupId.str(); + } + //add in any annotations + if (deliveryAnnotations) { + qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap(); + qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size); + decoder.readMap(annotations); + } + if (messageAnnotations) { + qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap(); + qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); + decoder.readMap(annotations); + } +} +qpid::amqp::CharSequence EncodedMessage::getBareMessage() const +{ + return bareMessage; +} + +void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const +{ + a = qpid::messaging::Address(replyTo.str()); +} +void EncodedMessage::getSubject(std::string& s) const +{ + s.assign(subject.data, subject.size); +} +void EncodedMessage::getContentType(std::string& s) const +{ + s.assign(contentType.data, contentType.size); +} +void EncodedMessage::getUserId(std::string& s) const +{ + s.assign(userId.data, userId.size); +} +void EncodedMessage::getMessageId(std::string& s) const +{ + messageId.assign(s); +} +void EncodedMessage::getCorrelationId(std::string& s) const +{ + correlationId.assign(s); +} +void EncodedMessage::getBody(std::string& s) const +{ + s.assign(body.data, body.size); +} + +qpid::amqp::CharSequence EncodedMessage::getBody() const +{ + return body; +} + +bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) const +{ + if (!durable) { + if (msg.isDurable()) return true; + } else { + if (durable.get() != msg.isDurable()) return true; + } + + if (!priority) { + if (msg.getPriority() != 4) return true; + } else { + if (priority.get() != msg.getPriority()) return true; + } + + if (msg.getTtl() && (!ttl || msg.getTtl() != ttl.get())) { + return true; + } + + //first-acquirer can't be changed via Message interface as yet + + if (msg.isRedelivered() && (!deliveryCount || deliveryCount.get() == 0)) { + return true; + } + + return false; +} + + +EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m) +{ + //set up defaults as needed: + mi.setPriority(4); +} +//header: +void EncodedMessage::InitialScan::onDurable(bool b) { mi.setDurable(b); em.durable = b; } +void EncodedMessage::InitialScan::onPriority(uint8_t i) { mi.setPriority(i); em.priority = i; } +void EncodedMessage::InitialScan::onTtl(uint32_t i) { mi.setTtl(i); em.ttl = i; } +void EncodedMessage::InitialScan::onFirstAcquirer(bool b) { em.firstAcquirer = b; } +void EncodedMessage::InitialScan::onDeliveryCount(uint32_t i) +{ + mi.setRedelivered(i); + em.deliveryCount = i; +} + +//properties: +void EncodedMessage::InitialScan::onMessageId(uint64_t v) { em.messageId.set(v); } +void EncodedMessage::InitialScan::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.messageId.set(v, t); } +void EncodedMessage::InitialScan::onUserId(const qpid::amqp::CharSequence& v) { em.userId = v; } +void EncodedMessage::InitialScan::onTo(const qpid::amqp::CharSequence& v) { em.to = v; } +void EncodedMessage::InitialScan::onSubject(const qpid::amqp::CharSequence& v) { em.subject = v; } +void EncodedMessage::InitialScan::onReplyTo(const qpid::amqp::CharSequence& v) { em.replyTo = v;} +void EncodedMessage::InitialScan::onCorrelationId(uint64_t v) { em.correlationId.set(v); } +void EncodedMessage::InitialScan::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.correlationId.set(v, t); } +void EncodedMessage::InitialScan::onContentType(const qpid::amqp::CharSequence& v) { em.contentType = v; } +void EncodedMessage::InitialScan::onContentEncoding(const qpid::amqp::CharSequence& v) { em.contentEncoding = v; } +void EncodedMessage::InitialScan::onAbsoluteExpiryTime(int64_t i) { em.absoluteExpiryTime = i; } +void EncodedMessage::InitialScan::onCreationTime(int64_t i) { em.creationTime = i; } +void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { em.groupId = v; } +void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; } +void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; } + +void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; } +void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; } +void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; } +void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) +{ + //TODO: how to communicate the type, i.e. descriptor? + em.body = v; +} +void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {} +void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; } + +}}} // namespace qpid::messaging::amqp |