/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/amqp/MessageEncoder.h" #include "qpid/amqp/descriptors.h" #include "qpid/log/Statement.h" namespace qpid { namespace amqp { namespace { size_t optimisable(const MessageEncoder::Header& msg) { if (msg.getDeliveryCount()) return 5; else if (msg.isFirstAcquirer()) return 4; else if (msg.hasTtl()) return 3; else if (msg.getPriority() != 4) return 2; else if (msg.isDurable()) return 1; else return 0; } size_t optimisable(const MessageEncoder::Properties& msg) { if (msg.hasReplyToGroupId()) return 13; else if (msg.hasGroupSequence()) return 12; else if (msg.hasGroupId()) return 11; else if (msg.hasCreationTime()) return 10; else if (msg.hasAbsoluteExpiryTime()) return 9; else if (msg.hasContentEncoding()) return 8; else if (msg.hasContentType()) return 7; else if (msg.hasCorrelationId()) return 6; else if (msg.hasReplyTo()) return 5; else if (msg.hasSubject()) return 4; else if (msg.hasTo()) return 3; else if (msg.hasUserId()) return 2; else if (msg.hasMessageId()) return 1; else return 0; } size_t encodedSize(const std::string& s) { size_t total = s.size(); if (total > 255) total += 4; else total += 1; return total; } const std::string BINARY("binary"); } void MessageEncoder::writeHeader(const Header& msg) { size_t fields(optimise ? optimisable(msg) : 5); if (fields) { void* token = startList8(&qpid::amqp::message::HEADER); writeBoolean(msg.isDurable()); if (fields > 1) writeUByte(msg.getPriority()); if (msg.getTtl()) writeUInt(msg.getTtl()); else if (fields > 2) writeNull(); if (msg.isFirstAcquirer()) writeBoolean(true); else if (fields > 3) writeNull(); if (msg.getDeliveryCount()) writeUInt(msg.getDeliveryCount()); else if (fields > 4) writeNull(); endList8(fields, token); } } void MessageEncoder::writeProperties(const Properties& msg) { size_t fields(optimise ? optimisable(msg) : 13); if (fields) { void* token = startList32(&qpid::amqp::message::PROPERTIES); if (msg.hasMessageId()) writeString(msg.getMessageId()); else writeNull(); if (msg.hasUserId()) writeBinary(msg.getUserId()); else if (fields > 1) writeNull(); if (msg.hasTo()) writeString(msg.getTo()); else if (fields > 2) writeNull(); if (msg.hasSubject()) writeString(msg.getSubject()); else if (fields > 3) writeNull(); if (msg.hasReplyTo()) writeString(msg.getReplyTo()); else if (fields > 4) writeNull(); if (msg.hasCorrelationId()) writeString(msg.getCorrelationId()); else if (fields > 5) writeNull(); if (msg.hasContentType()) writeSymbol(msg.getContentType()); else if (fields > 6) writeNull(); if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding()); else if (fields > 7) writeNull(); if (msg.hasAbsoluteExpiryTime()) writeLong(msg.getAbsoluteExpiryTime()); else if (fields > 8) writeNull(); if (msg.hasCreationTime()) writeLong(msg.getCreationTime()); else if (fields > 9) writeNull(); if (msg.hasGroupId()) writeString(msg.getGroupId()); else if (fields > 10) writeNull(); if (msg.hasGroupSequence()) writeUInt(msg.getGroupSequence()); else if (fields > 11) writeNull(); if (msg.hasReplyToGroupId()) writeString(msg.getReplyToGroupId()); else if (fields > 12) writeNull(); endList32(fields, token); } } void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties) { writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255); } void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties, bool large) { writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large); } void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const Descriptor* d, bool large) { void* token = large ? startMap32(d) : startMap8(d); for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { writeString(i->first); switch (i->second.getType()) { case qpid::types::VAR_MAP: case qpid::types::VAR_LIST: //not allowed (TODO: revise, only strictly true for application-properties) whereas this is now a more general method) QPID_LOG(warning, "Ignoring nested map/list; not allowed in application-properties for AMQP 1.0"); case qpid::types::VAR_VOID: writeNull(); break; case qpid::types::VAR_BOOL: writeBoolean(i->second); break; case qpid::types::VAR_UINT8: writeUByte(i->second); break; case qpid::types::VAR_UINT16: writeUShort(i->second); break; case qpid::types::VAR_UINT32: writeUInt(i->second); break; case qpid::types::VAR_UINT64: writeULong(i->second); break; case qpid::types::VAR_INT8: writeByte(i->second); break; case qpid::types::VAR_INT16: writeShort(i->second); break; case qpid::types::VAR_INT32: writeInt(i->second); break; case qpid::types::VAR_INT64: writeULong(i->second); break; case qpid::types::VAR_FLOAT: writeFloat(i->second); break; case qpid::types::VAR_DOUBLE: writeDouble(i->second); break; case qpid::types::VAR_STRING: if (i->second.getEncoding() == BINARY) { writeBinary(i->second); } else { writeString(i->second); } break; case qpid::types::VAR_UUID: writeUuid(i->second); break; } } if (large) endMap32(properties.size()*2, token); else endMap8(properties.size()*2, token); } size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) { //NOTE: this does not take optional optimisation into account, //i.e. it is a 'worst case' estimate for required buffer space size_t total(0); //header: total += 3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/; if (h.getPriority() != 4) total += 1; if (h.getDeliveryCount()) total += 4; if (h.hasTtl()) total += 4; return total + getEncodedSize(p, ap, d); } size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) { //NOTE: this does not take optional optimisation into account, //i.e. it is a 'worst case' estimate for required buffer space size_t total(0); //properties: total += 3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/; if (p.hasMessageId()) total += encodedSize(p.getMessageId()); if (p.hasUserId()) total += encodedSize(p.getUserId()); if (p.hasTo()) total += encodedSize(p.getTo()); if (p.hasSubject()) total += encodedSize(p.getSubject()); if (p.hasReplyTo()) total += encodedSize(p.getReplyTo()); if (p.hasCorrelationId()) total += encodedSize(p.getCorrelationId()); if (p.hasContentType()) total += encodedSize(p.getContentType()); if (p.hasContentEncoding()) total += encodedSize(p.getContentEncoding()); if (p.hasAbsoluteExpiryTime()) total += 8; if (p.hasCreationTime()) total += 8; if (p.hasGroupId()) total += encodedSize(p.getGroupId()); if (p.hasGroupSequence()) total += 4; if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId()); //application-properties: total += 3/*descriptor*/ + getEncodedSize(ap, true); //body: if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d); return total; } size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map) { size_t total = 0; for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { total += 1/*code*/ + encodedSize(i->first); switch (i->second.getType()) { case qpid::types::VAR_MAP: case qpid::types::VAR_LIST: case qpid::types::VAR_VOID: case qpid::types::VAR_BOOL: total += 1; break; case qpid::types::VAR_UINT8: case qpid::types::VAR_INT8: total += 2; break; case qpid::types::VAR_UINT16: case qpid::types::VAR_INT16: total += 3; break; case qpid::types::VAR_UINT32: case qpid::types::VAR_INT32: case qpid::types::VAR_FLOAT: total += 5; break; case qpid::types::VAR_UINT64: case qpid::types::VAR_INT64: case qpid::types::VAR_DOUBLE: total += 9; break; case qpid::types::VAR_UUID: total += 17; break; case qpid::types::VAR_STRING: total += 1/*code*/ + encodedSize(i->second); break; } } return total; } size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool alwaysUseLargeMap) { size_t total = getEncodedSizeForElements(map); //its not just the count that determines whether we can use a small map, but the aggregate size: if (alwaysUseLargeMap || map.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/; else total += 1/*size*/ + 1/*count*/; total += 1 /*code for map itself*/; return total; } }} // namespace qpid::amqp