diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/amqp/MessageEncoder.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/amqp/MessageEncoder.cpp')
-rw-r--r-- | cpp/src/qpid/amqp/MessageEncoder.cpp | 313 |
1 files changed, 313 insertions, 0 deletions
diff --git a/cpp/src/qpid/amqp/MessageEncoder.cpp b/cpp/src/qpid/amqp/MessageEncoder.cpp new file mode 100644 index 0000000000..852ad29635 --- /dev/null +++ b/cpp/src/qpid/amqp/MessageEncoder.cpp @@ -0,0 +1,313 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp { + +namespace { +size_t optimisable(const MessageEncoder::Header& msg) +{ + if (msg.getDeliveryCount()) return 5; + else if (msg.isFirstAcquirer()) return 4; + else if (msg.hasTtl()) return 3; + else if (msg.getPriority() != 4) return 2; + else if (msg.isDurable()) return 1; + else return 0; +} + +size_t optimisable(const MessageEncoder::Properties& msg) +{ + if (msg.hasReplyToGroupId()) return 13; + else if (msg.hasGroupSequence()) return 12; + else if (msg.hasGroupId()) return 11; + else if (msg.hasCreationTime()) return 10; + else if (msg.hasAbsoluteExpiryTime()) return 9; + else if (msg.hasContentEncoding()) return 8; + else if (msg.hasContentType()) return 7; + else if (msg.hasCorrelationId()) return 6; + else if (msg.hasReplyTo()) return 5; + else if (msg.hasSubject()) return 4; + else if (msg.hasTo()) return 3; + else if (msg.hasUserId()) return 2; + else if (msg.hasMessageId()) return 1; + else return 0; +} +size_t encodedSize(const std::string& s) +{ + size_t total = s.size(); + if (total > 255) total += 4; + else total += 1; + return total; +} +const std::string BINARY("binary"); +} + +void MessageEncoder::writeHeader(const Header& msg) +{ + size_t fields(optimise ? optimisable(msg) : 5); + if (fields) { + void* token = startList8(&qpid::amqp::message::HEADER); + writeBoolean(msg.isDurable()); + if (fields > 1) writeUByte(msg.getPriority()); + + if (msg.getTtl()) writeUInt(msg.getTtl()); + else if (fields > 2) writeNull(); + + if (msg.isFirstAcquirer()) writeBoolean(true); + else if (fields > 3) writeNull(); + + if (msg.getDeliveryCount()) writeUInt(msg.getDeliveryCount()); + else if (fields > 4) writeNull(); + endList8(fields, token); + } +} + + +void MessageEncoder::writeProperties(const Properties& msg) +{ + size_t fields(optimise ? optimisable(msg) : 13); + if (fields) { + void* token = startList32(&qpid::amqp::message::PROPERTIES); + if (msg.hasMessageId()) writeString(msg.getMessageId()); + else writeNull(); + + if (msg.hasUserId()) writeBinary(msg.getUserId()); + else if (fields > 1) writeNull(); + + if (msg.hasTo()) writeString(msg.getTo()); + else if (fields > 2) writeNull(); + + if (msg.hasSubject()) writeString(msg.getSubject()); + else if (fields > 3) writeNull(); + + if (msg.hasReplyTo()) writeString(msg.getReplyTo()); + else if (fields > 4) writeNull(); + + if (msg.hasCorrelationId()) writeString(msg.getCorrelationId()); + else if (fields > 5) writeNull(); + + if (msg.hasContentType()) writeSymbol(msg.getContentType()); + else if (fields > 6) writeNull(); + + if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding()); + else if (fields > 7) writeNull(); + + if (msg.hasAbsoluteExpiryTime()) writeLong(msg.getAbsoluteExpiryTime()); + else if (fields > 8) writeNull(); + + if (msg.hasCreationTime()) writeLong(msg.getCreationTime()); + else if (fields > 9) writeNull(); + + if (msg.hasGroupId()) writeString(msg.getGroupId()); + else if (fields > 10) writeNull(); + + if (msg.hasGroupSequence()) writeUInt(msg.getGroupSequence()); + else if (fields > 11) writeNull(); + + if (msg.hasReplyToGroupId()) writeString(msg.getReplyToGroupId()); + else if (fields > 12) writeNull(); + + endList32(fields, token); + } +} + +void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties) +{ + writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255); +} + +void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties, bool large) +{ + writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large); +} + +void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const Descriptor* d, bool large) +{ + void* token = large ? startMap32(d) : startMap8(d); + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + writeString(i->first); + switch (i->second.getType()) { + case qpid::types::VAR_MAP: + case qpid::types::VAR_LIST: + //not allowed (TODO: revise, only strictly true for application-properties) whereas this is now a more general method) + QPID_LOG(warning, "Ignoring nested map/list; not allowed in application-properties for AMQP 1.0"); + case qpid::types::VAR_VOID: + writeNull(); + break; + case qpid::types::VAR_BOOL: + writeBoolean(i->second); + break; + case qpid::types::VAR_UINT8: + writeUByte(i->second); + break; + case qpid::types::VAR_UINT16: + writeUShort(i->second); + break; + case qpid::types::VAR_UINT32: + writeUInt(i->second); + break; + case qpid::types::VAR_UINT64: + writeULong(i->second); + break; + case qpid::types::VAR_INT8: + writeByte(i->second); + break; + case qpid::types::VAR_INT16: + writeShort(i->second); + break; + case qpid::types::VAR_INT32: + writeInt(i->second); + break; + case qpid::types::VAR_INT64: + writeULong(i->second); + break; + case qpid::types::VAR_FLOAT: + writeFloat(i->second); + break; + case qpid::types::VAR_DOUBLE: + writeDouble(i->second); + break; + case qpid::types::VAR_STRING: + if (i->second.getEncoding() == BINARY) { + writeBinary(i->second); + } else { + writeString(i->second); + } + break; + case qpid::types::VAR_UUID: + writeUuid(i->second); + break; + } + } + if (large) endMap32(properties.size()*2, token); + else endMap8(properties.size()*2, token); +} + +size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) +{ + //NOTE: this does not take optional optimisation into account, + //i.e. it is a 'worst case' estimate for required buffer space + size_t total(0); + + //header: + total += 3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/; + if (h.getPriority() != 4) total += 1; + if (h.getDeliveryCount()) total += 4; + if (h.hasTtl()) total += 4; + return total + getEncodedSize(p, ap, d); +} + +size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) +{ + //NOTE: this does not take optional optimisation into account, + //i.e. it is a 'worst case' estimate for required buffer space + size_t total(0); + + //properties: + total += 3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/; + if (p.hasMessageId()) total += encodedSize(p.getMessageId()); + if (p.hasUserId()) total += encodedSize(p.getUserId()); + if (p.hasTo()) total += encodedSize(p.getTo()); + if (p.hasSubject()) total += encodedSize(p.getSubject()); + if (p.hasReplyTo()) total += encodedSize(p.getReplyTo()); + if (p.hasCorrelationId()) total += encodedSize(p.getCorrelationId()); + if (p.hasContentType()) total += encodedSize(p.getContentType()); + if (p.hasContentEncoding()) total += encodedSize(p.getContentEncoding()); + if (p.hasAbsoluteExpiryTime()) total += 8; + if (p.hasCreationTime()) total += 8; + if (p.hasGroupId()) total += encodedSize(p.getGroupId()); + if (p.hasGroupSequence()) total += 4; + if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId()); + + + //application-properties: + total += 3/*descriptor*/ + getEncodedSize(ap, true); + //body: + if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d); + + return total; +} + +size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map) +{ + size_t total = 0; + for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + total += 1/*code*/ + encodedSize(i->first); + + switch (i->second.getType()) { + case qpid::types::VAR_MAP: + case qpid::types::VAR_LIST: + case qpid::types::VAR_VOID: + case qpid::types::VAR_BOOL: + total += 1; + break; + + case qpid::types::VAR_UINT8: + case qpid::types::VAR_INT8: + total += 2; + break; + + case qpid::types::VAR_UINT16: + case qpid::types::VAR_INT16: + total += 3; + break; + + case qpid::types::VAR_UINT32: + case qpid::types::VAR_INT32: + case qpid::types::VAR_FLOAT: + total += 5; + break; + + case qpid::types::VAR_UINT64: + case qpid::types::VAR_INT64: + case qpid::types::VAR_DOUBLE: + total += 9; + break; + + case qpid::types::VAR_UUID: + total += 17; + break; + + case qpid::types::VAR_STRING: + total += 1/*code*/ + encodedSize(i->second); + break; + } + } + return total; +} + + +size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool alwaysUseLargeMap) +{ + size_t total = getEncodedSizeForElements(map); + + //its not just the count that determines whether we can use a small map, but the aggregate size: + if (alwaysUseLargeMap || map.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/; + else total += 1/*size*/ + 1/*count*/; + + total += 1 /*code for map itself*/; + + return total; +} +}} // namespace qpid::amqp |