diff options
| author | Gordon Sim <gsim@apache.org> | 2013-10-01 13:30:58 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-10-01 13:30:58 +0000 |
| commit | 86c67bf9778b7bf2f0f9eba7b17b28b112e98dde (patch) | |
| tree | 394630a69d4473766ac8c3561c65b18ac2780d26 /qpid/cpp | |
| parent | e3d9bcdceecfaf359a4f3c82b11b25c8916ed0a6 (diff) | |
| download | qpid-python-86c67bf9778b7bf2f0f9eba7b17b28b112e98dde.tar.gz | |
QPID-5198: ensure qpid::Exception does not leak out from qpid::messaging
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1528082 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp | 171 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 106 |
4 files changed, 152 insertions, 133 deletions
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp index 0232da8ae1..9e1e5f23fe 100644 --- a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp +++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp @@ -19,7 +19,7 @@ * */ #include "ProtocolRegistry.h" -#include "qpid/Exception.h" +#include "qpid/messaging/exceptions.h" #include "qpid/client/amqp0_10/ConnectionImpl.h" #include "qpid/client/LoadPlugins.h" #include <map> @@ -61,7 +61,7 @@ ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant:: Registry::const_iterator i = theRegistry().find(name.asString()); if (i != theRegistry().end()) return (i->second)(url, stripped); else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped); - else throw qpid::Exception("Unsupported protocol: " + name.asString()); + else throw MessagingException("Unsupported protocol: " + name.asString()); } return 0; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index dba5cb1e1c..13d943757e 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -584,7 +584,7 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) lock.notifyAll(); return n; } else if (n == PN_ERR) { - throw qpid::Exception(QPID_MSG("Error on input: " << getError())); + throw MessagingException(QPID_MSG("Error on input: " << getError())); } else { return 0; } @@ -608,7 +608,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) haveOutput = true; return n; } else if (n == PN_ERR) { - throw qpid::Exception(QPID_MSG("Error on output: " << getError())); + throw MessagingException(QPID_MSG("Error on output: " << getError())); } else if (n == PN_EOS) { haveOutput = false; return 0;//Is this right? diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp index 1de7180cb9..504c5030a8 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -20,7 +20,9 @@ */ #include "qpid/messaging/amqp/EncodedMessage.h" #include "qpid/messaging/Address.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/MessageImpl.h" +#include "qpid/Exception.h" #include "qpid/amqp/Decoder.h" #include "qpid/amqp/DataBuilder.h" #include "qpid/amqp/ListBuilder.h" @@ -100,66 +102,73 @@ const char* EncodedMessage::getData() const void EncodedMessage::init(qpid::messaging::MessageImpl& impl) { - //initial scan of raw data - qpid::amqp::Decoder decoder(data, size); - InitialScan reader(*this, impl); - decoder.read(reader); - bareMessage = reader.getBareMessage(); - if (bareMessage.data && !bareMessage.size) { - bareMessage.size = (data + size) - bareMessage.data; + try { + //initial scan of raw data + qpid::amqp::Decoder decoder(data, size); + InitialScan reader(*this, impl); + decoder.read(reader); + bareMessage = reader.getBareMessage(); + if (bareMessage.data && !bareMessage.size) { + bareMessage.size = (data + size) - bareMessage.data; + } + } catch (const qpid::Exception& e) { + throw FetchError(e.what()); } - } void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; } void EncodedMessage::populate(qpid::types::Variant::Map& map) const { - //decode application properties - if (applicationProperties) { - qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size); - decoder.readMap(map); - } - //add in 'x-amqp-' prefixed values - if (!!firstAcquirer) { - map["x-amqp-first-acquirer"] = firstAcquirer.get(); - } - if (!!deliveryCount) { - map["x-amqp-delivery-count"] = deliveryCount.get(); - } - if (to) { - map["x-amqp-to"] = to.str(); - } - if (!!absoluteExpiryTime) { - map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get(); - } - if (!!creationTime) { - map["x-amqp-creation-time"] = creationTime.get(); - } - if (groupId) { - map["x-amqp-group-id"] = groupId.str(); - } - if (!!groupSequence) { - map["x-amqp-qroup-sequence"] = groupSequence.get(); - } - if (replyToGroupId) { - map["x-amqp-reply-to-group-id"] = replyToGroupId.str(); - } - //add in any annotations - if (deliveryAnnotations) { - qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size); - if (nestAnnotations) { - map["x-amqp-delivery-annotations"] = decoder.readMap(); - } else { + try { + //decode application properties + if (applicationProperties) { + qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size); decoder.readMap(map); } - } - if (messageAnnotations) { - qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); - if (nestAnnotations) { - map["x-amqp-message-annotations"] = decoder.readMap(); - } else { - decoder.readMap(map); + //add in 'x-amqp-' prefixed values + if (!!firstAcquirer) { + map["x-amqp-first-acquirer"] = firstAcquirer.get(); + } + if (!!deliveryCount) { + map["x-amqp-delivery-count"] = deliveryCount.get(); + } + if (to) { + map["x-amqp-to"] = to.str(); + } + if (!!absoluteExpiryTime) { + map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get(); + } + if (!!creationTime) { + map["x-amqp-creation-time"] = creationTime.get(); + } + if (groupId) { + map["x-amqp-group-id"] = groupId.str(); } + if (!!groupSequence) { + map["x-amqp-qroup-sequence"] = groupSequence.get(); + } + if (replyToGroupId) { + map["x-amqp-reply-to-group-id"] = replyToGroupId.str(); + } + //add in any annotations + if (deliveryAnnotations) { + qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size); + if (nestAnnotations) { + map["x-amqp-delivery-annotations"] = decoder.readMap(); + } else { + decoder.readMap(map); + } + } + if (messageAnnotations) { + qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); + if (nestAnnotations) { + map["x-amqp-message-annotations"] = decoder.readMap(); + } else { + decoder.readMap(map); + } + } + } catch (const qpid::Exception& e) { + throw FetchError(e.what()); } } qpid::amqp::CharSequence EncodedMessage::getBareMessage() const @@ -201,35 +210,39 @@ void EncodedMessage::getCorrelationId(std::string& s) const } void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const { - if (!content.isVoid()) { - c = content;//integer types, floats, bool etc - //TODO: populate raw data? - } else { - if (bodyType.empty() - || bodyType == qpid::amqp::typecodes::BINARY_NAME - || bodyType == qpid::types::encodings::UTF8 - || bodyType == qpid::types::encodings::ASCII) - { - c = std::string(body.data, body.size); - c.setEncoding(bodyType); - } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) { - qpid::amqp::ListBuilder builder; - qpid::amqp::Decoder decoder(body.data, body.size); - decoder.read(builder); - c = builder.getList(); - raw.assign(body.data, body.size); - } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) { - qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map()); - qpid::amqp::Decoder decoder(body.data, body.size); - decoder.read(builder); - c = builder.getValue().asMap(); - raw.assign(body.data, body.size); - } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) { - if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data); - raw.assign(body.data, body.size); - } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) { - raw.assign(body.data, body.size); + try { + if (!content.isVoid()) { + c = content;//integer types, floats, bool etc + //TODO: populate raw data? + } else { + if (bodyType.empty() + || bodyType == qpid::amqp::typecodes::BINARY_NAME + || bodyType == qpid::types::encodings::UTF8 + || bodyType == qpid::types::encodings::ASCII) + { + c = std::string(body.data, body.size); + c.setEncoding(bodyType); + } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) { + qpid::amqp::ListBuilder builder; + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + c = builder.getList(); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) { + qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map()); + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + c = builder.getValue().asMap(); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) { + if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) { + raw.assign(body.data, body.size); + } } + } catch (const qpid::Exception& e) { + throw FetchError(e.what()); } } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 94851da273..a7118af598 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -21,6 +21,8 @@ #include "qpid/messaging/amqp/SenderContext.h" #include "qpid/messaging/amqp/EncodedMessage.h" #include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/Exception.h" #include "qpid/amqp/descriptors.h" #include "qpid/amqp/MapHandler.h" #include "qpid/amqp/MessageEncoder.h" @@ -435,59 +437,63 @@ void SenderContext::Delivery::reset() void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) { - boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); - - if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered - //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? - if (original->hasHeaderChanged(msg)) { - //since as yet have no annotations, just write the revised header then the rest of the message as received - encoded.resize(16/*max header size*/ + original->getBareMessage().size); - qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + try { + boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); + + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered + //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? + if (original->hasHeaderChanged(msg)) { + //since as yet have no annotations, just write the revised header then the rest of the message as received + encoded.resize(16/*max header size*/ + original->getBareMessage().size); + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + HeaderAdapter header(msg); + encoder.writeHeader(header); + ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size); + } else { + //since as yet have no annotations, if the header hasn't + //changed and we still have the original bare message, can + //send the entire content as is + encoded.resize(original->getSize()); + ::memcpy(encoded.getData(), original->getData(), original->getSize()); + } + } else { HeaderAdapter header(msg); + PropertiesAdapter properties(msg, address.getSubject()); + ApplicationPropertiesAdapter applicationProperties(msg.getHeaders()); + //compute size: + size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header) + + qpid::amqp::MessageEncoder::getEncodedSize(properties) + + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties); + if (msg.getContent().isVoid()) { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes()); + } else { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/; + } + encoded.resize(contentSize); + QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + //write header: encoder.writeHeader(header); - ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size); - } else { - //since as yet have no annotations, if the header hasn't - //changed and we still have the original bare message, can - //send the entire content as is - encoded.resize(original->getSize()); - ::memcpy(encoded.getData(), original->getData(), original->getSize()); - } - } else { - HeaderAdapter header(msg); - PropertiesAdapter properties(msg, address.getSubject()); - ApplicationPropertiesAdapter applicationProperties(msg.getHeaders()); - //compute size: - size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header) - + qpid::amqp::MessageEncoder::getEncodedSize(properties) - + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties); - if (msg.getContent().isVoid()) { - contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes()); - } else { - contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/; - } - encoded.resize(contentSize); - QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") - qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); - //write header: - encoder.writeHeader(header); - //write delivery-annotations, write message-annotations (none yet supported) - //write properties - encoder.writeProperties(properties); - //write application-properties - encoder.writeApplicationProperties(applicationProperties); - //write body - if (!msg.getContent().isVoid()) { - //write as AmqpValue - encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE); - } else if (msg.getBytes().size()) { - encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported - } - if (encoder.getPosition() < encoded.getSize()) { - QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition()); - encoded.trim(encoder.getPosition()); + //write delivery-annotations, write message-annotations (none yet supported) + //write properties + encoder.writeProperties(properties); + //write application-properties + encoder.writeApplicationProperties(applicationProperties); + //write body + if (!msg.getContent().isVoid()) { + //write as AmqpValue + encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE); + } else if (msg.getBytes().size()) { + encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported + } + if (encoder.getPosition() < encoded.getSize()) { + QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition()); + encoded.trim(encoder.getPosition()); + } + //write footer (no annotations yet supported) } - //write footer (no annotations yet supported) + } catch (const qpid::Exception& e) { + throw SendError(e.what()); } } void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) |
