/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/messaging/amqp/EncodedMessage.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/amqp/Decoder.h" #include #include namespace qpid { namespace messaging { namespace amqp { using namespace qpid::amqp; EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0) { init(); } EncodedMessage::EncodedMessage() : size(0), data(0) { init(); } EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0) { init(); } void EncodedMessage::init() { //init all CharSequence members deliveryAnnotations.init(); messageAnnotations.init(); userId.init(); to.init(); subject.init(); replyTo.init(); contentType.init(); contentEncoding.init(); groupId.init(); replyToGroupId.init(); applicationProperties.init(); body.init(); footer.init(); } EncodedMessage::~EncodedMessage() { delete[] data; } size_t EncodedMessage::getSize() const { return size; } void EncodedMessage::trim(size_t t) { size = t; } void EncodedMessage::resize(size_t s) { delete[] data; size = s; data = new char[size]; } char* EncodedMessage::getData() { return data; } const char* EncodedMessage::getData() const { return data; } void EncodedMessage::init(qpid::messaging::MessageImpl& impl) { //initial scan of raw data qpid::amqp::Decoder decoder(data, size); InitialScan reader(*this, impl); decoder.read(reader); bareMessage = reader.getBareMessage(); if (bareMessage.data && !bareMessage.size) { bareMessage.size = (data + size) - bareMessage.data; } } void EncodedMessage::populate(qpid::types::Variant::Map& map) const { //decode application properties if (applicationProperties) { qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size); decoder.readMap(map); } //add in 'x-amqp-' prefixed values if (!!firstAcquirer) { map["x-amqp-first-acquirer"] = firstAcquirer.get(); } if (!!deliveryCount) { map["x-amqp-delivery-count"] = deliveryCount.get(); } if (to) { map["x-amqp-delivery-count"] = to.str(); } if (!!absoluteExpiryTime) { map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get(); } if (!!creationTime) { map["x-amqp-creation-time"] = creationTime.get(); } if (groupId) { map["x-amqp-group-id"] = groupId.str(); } if (!!groupSequence) { map["x-amqp-qroup-sequence"] = groupSequence.get(); } if (replyToGroupId) { map["x-amqp-reply-to-group-id"] = replyToGroupId.str(); } //add in any annotations if (deliveryAnnotations) { qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap(); qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size); decoder.readMap(annotations); } if (messageAnnotations) { qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap(); qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); decoder.readMap(annotations); } } qpid::amqp::CharSequence EncodedMessage::getBareMessage() const { return bareMessage; } void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const { a = qpid::messaging::Address(replyTo.str()); } void EncodedMessage::getSubject(std::string& s) const { s.assign(subject.data, subject.size); } void EncodedMessage::getContentType(std::string& s) const { s.assign(contentType.data, contentType.size); } void EncodedMessage::getUserId(std::string& s) const { s.assign(userId.data, userId.size); } void EncodedMessage::getMessageId(std::string& s) const { messageId.assign(s); } void EncodedMessage::getCorrelationId(std::string& s) const { correlationId.assign(s); } void EncodedMessage::getBody(std::string& s) const { s.assign(body.data, body.size); } qpid::amqp::CharSequence EncodedMessage::getBody() const { return body; } bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) const { if (!durable) { if (msg.isDurable()) return true; } else { if (durable.get() != msg.isDurable()) return true; } if (!priority) { if (msg.getPriority() != 4) return true; } else { if (priority.get() != msg.getPriority()) return true; } if (msg.getTtl() && (!ttl || msg.getTtl() != ttl.get())) { return true; } //first-acquirer can't be changed via Message interface as yet if (msg.isRedelivered() && (!deliveryCount || deliveryCount.get() == 0)) { return true; } return false; } EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m) { //set up defaults as needed: mi.setPriority(4); } //header: void EncodedMessage::InitialScan::onDurable(bool b) { mi.setDurable(b); em.durable = b; } void EncodedMessage::InitialScan::onPriority(uint8_t i) { mi.setPriority(i); em.priority = i; } void EncodedMessage::InitialScan::onTtl(uint32_t i) { mi.setTtl(i); em.ttl = i; } void EncodedMessage::InitialScan::onFirstAcquirer(bool b) { em.firstAcquirer = b; } void EncodedMessage::InitialScan::onDeliveryCount(uint32_t i) { mi.setRedelivered(i); em.deliveryCount = i; } //properties: void EncodedMessage::InitialScan::onMessageId(uint64_t v) { em.messageId.set(v); } void EncodedMessage::InitialScan::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.messageId.set(v, t); } void EncodedMessage::InitialScan::onUserId(const qpid::amqp::CharSequence& v) { em.userId = v; } void EncodedMessage::InitialScan::onTo(const qpid::amqp::CharSequence& v) { em.to = v; } void EncodedMessage::InitialScan::onSubject(const qpid::amqp::CharSequence& v) { em.subject = v; } void EncodedMessage::InitialScan::onReplyTo(const qpid::amqp::CharSequence& v) { em.replyTo = v;} void EncodedMessage::InitialScan::onCorrelationId(uint64_t v) { em.correlationId.set(v); } void EncodedMessage::InitialScan::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.correlationId.set(v, t); } void EncodedMessage::InitialScan::onContentType(const qpid::amqp::CharSequence& v) { em.contentType = v; } void EncodedMessage::InitialScan::onContentEncoding(const qpid::amqp::CharSequence& v) { em.contentEncoding = v; } void EncodedMessage::InitialScan::onAbsoluteExpiryTime(int64_t i) { em.absoluteExpiryTime = i; } void EncodedMessage::InitialScan::onCreationTime(int64_t i) { em.creationTime = i; } void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { em.groupId = v; } void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; } void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; } void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; } void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; } void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; } void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { //TODO: how to communicate the type, i.e. descriptor? em.body = v; } void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {} void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; } }}} // namespace qpid::messaging::amqp