summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-10-01 13:30:58 +0000
committerGordon Sim <gsim@apache.org>2013-10-01 13:30:58 +0000
commit86c67bf9778b7bf2f0f9eba7b17b28b112e98dde (patch)
tree394630a69d4473766ac8c3561c65b18ac2780d26 /qpid/cpp
parente3d9bcdceecfaf359a4f3c82b11b25c8916ed0a6 (diff)
downloadqpid-python-86c67bf9778b7bf2f0f9eba7b17b28b112e98dde.tar.gz
QPID-5198: ensure qpid::Exception does not leak out from qpid::messaging
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1528082 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp171
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp106
4 files changed, 152 insertions, 133 deletions
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
index 0232da8ae1..9e1e5f23fe 100644
--- a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
+++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
@@ -19,7 +19,7 @@
*
*/
#include "ProtocolRegistry.h"
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/client/LoadPlugins.h"
#include <map>
@@ -61,7 +61,7 @@ ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::
Registry::const_iterator i = theRegistry().find(name.asString());
if (i != theRegistry().end()) return (i->second)(url, stripped);
else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
- else throw qpid::Exception("Unsupported protocol: " + name.asString());
+ else throw MessagingException("Unsupported protocol: " + name.asString());
}
return 0;
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index dba5cb1e1c..13d943757e 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -584,7 +584,7 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
lock.notifyAll();
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+ throw MessagingException(QPID_MSG("Error on input: " << getError()));
} else {
return 0;
}
@@ -608,7 +608,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
haveOutput = true;
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+ throw MessagingException(QPID_MSG("Error on output: " << getError()));
} else if (n == PN_EOS) {
haveOutput = false;
return 0;//Is this right?
diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
index 1de7180cb9..504c5030a8 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
@@ -20,7 +20,9 @@
*/
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/MessageImpl.h"
+#include "qpid/Exception.h"
#include "qpid/amqp/Decoder.h"
#include "qpid/amqp/DataBuilder.h"
#include "qpid/amqp/ListBuilder.h"
@@ -100,66 +102,73 @@ const char* EncodedMessage::getData() const
void EncodedMessage::init(qpid::messaging::MessageImpl& impl)
{
- //initial scan of raw data
- qpid::amqp::Decoder decoder(data, size);
- InitialScan reader(*this, impl);
- decoder.read(reader);
- bareMessage = reader.getBareMessage();
- if (bareMessage.data && !bareMessage.size) {
- bareMessage.size = (data + size) - bareMessage.data;
+ try {
+ //initial scan of raw data
+ qpid::amqp::Decoder decoder(data, size);
+ InitialScan reader(*this, impl);
+ decoder.read(reader);
+ bareMessage = reader.getBareMessage();
+ if (bareMessage.data && !bareMessage.size) {
+ bareMessage.size = (data + size) - bareMessage.data;
+ }
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
-
}
void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; }
void EncodedMessage::populate(qpid::types::Variant::Map& map) const
{
- //decode application properties
- if (applicationProperties) {
- qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
- decoder.readMap(map);
- }
- //add in 'x-amqp-' prefixed values
- if (!!firstAcquirer) {
- map["x-amqp-first-acquirer"] = firstAcquirer.get();
- }
- if (!!deliveryCount) {
- map["x-amqp-delivery-count"] = deliveryCount.get();
- }
- if (to) {
- map["x-amqp-to"] = to.str();
- }
- if (!!absoluteExpiryTime) {
- map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
- }
- if (!!creationTime) {
- map["x-amqp-creation-time"] = creationTime.get();
- }
- if (groupId) {
- map["x-amqp-group-id"] = groupId.str();
- }
- if (!!groupSequence) {
- map["x-amqp-qroup-sequence"] = groupSequence.get();
- }
- if (replyToGroupId) {
- map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
- }
- //add in any annotations
- if (deliveryAnnotations) {
- qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
- if (nestAnnotations) {
- map["x-amqp-delivery-annotations"] = decoder.readMap();
- } else {
+ try {
+ //decode application properties
+ if (applicationProperties) {
+ qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
decoder.readMap(map);
}
- }
- if (messageAnnotations) {
- qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
- if (nestAnnotations) {
- map["x-amqp-message-annotations"] = decoder.readMap();
- } else {
- decoder.readMap(map);
+ //add in 'x-amqp-' prefixed values
+ if (!!firstAcquirer) {
+ map["x-amqp-first-acquirer"] = firstAcquirer.get();
+ }
+ if (!!deliveryCount) {
+ map["x-amqp-delivery-count"] = deliveryCount.get();
+ }
+ if (to) {
+ map["x-amqp-to"] = to.str();
+ }
+ if (!!absoluteExpiryTime) {
+ map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
+ }
+ if (!!creationTime) {
+ map["x-amqp-creation-time"] = creationTime.get();
+ }
+ if (groupId) {
+ map["x-amqp-group-id"] = groupId.str();
}
+ if (!!groupSequence) {
+ map["x-amqp-qroup-sequence"] = groupSequence.get();
+ }
+ if (replyToGroupId) {
+ map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
+ }
+ //add in any annotations
+ if (deliveryAnnotations) {
+ qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
+ if (nestAnnotations) {
+ map["x-amqp-delivery-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
+ }
+ if (messageAnnotations) {
+ qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
+ if (nestAnnotations) {
+ map["x-amqp-message-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
+ }
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
}
qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
@@ -201,35 +210,39 @@ void EncodedMessage::getCorrelationId(std::string& s) const
}
void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const
{
- if (!content.isVoid()) {
- c = content;//integer types, floats, bool etc
- //TODO: populate raw data?
- } else {
- if (bodyType.empty()
- || bodyType == qpid::amqp::typecodes::BINARY_NAME
- || bodyType == qpid::types::encodings::UTF8
- || bodyType == qpid::types::encodings::ASCII)
- {
- c = std::string(body.data, body.size);
- c.setEncoding(bodyType);
- } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
- qpid::amqp::ListBuilder builder;
- qpid::amqp::Decoder decoder(body.data, body.size);
- decoder.read(builder);
- c = builder.getList();
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
- qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
- qpid::amqp::Decoder decoder(body.data, body.size);
- decoder.read(builder);
- c = builder.getValue().asMap();
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
- if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
- raw.assign(body.data, body.size);
+ try {
+ if (!content.isVoid()) {
+ c = content;//integer types, floats, bool etc
+ //TODO: populate raw data?
+ } else {
+ if (bodyType.empty()
+ || bodyType == qpid::amqp::typecodes::BINARY_NAME
+ || bodyType == qpid::types::encodings::UTF8
+ || bodyType == qpid::types::encodings::ASCII)
+ {
+ c = std::string(body.data, body.size);
+ c.setEncoding(bodyType);
+ } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+ qpid::amqp::ListBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getList();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+ qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getValue().asMap();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
+ if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
+ raw.assign(body.data, body.size);
+ }
}
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 94851da273..a7118af598 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -21,6 +21,8 @@
#include "qpid/messaging/amqp/SenderContext.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/AddressImpl.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/Exception.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MapHandler.h"
#include "qpid/amqp/MessageEncoder.h"
@@ -435,59 +437,63 @@ void SenderContext::Delivery::reset()
void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
- boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
-
- if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
- //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
- if (original->hasHeaderChanged(msg)) {
- //since as yet have no annotations, just write the revised header then the rest of the message as received
- encoded.resize(16/*max header size*/ + original->getBareMessage().size);
- qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ try {
+ boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
+
+ if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
+ //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
+ if (original->hasHeaderChanged(msg)) {
+ //since as yet have no annotations, just write the revised header then the rest of the message as received
+ encoded.resize(16/*max header size*/ + original->getBareMessage().size);
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ HeaderAdapter header(msg);
+ encoder.writeHeader(header);
+ ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
+ } else {
+ //since as yet have no annotations, if the header hasn't
+ //changed and we still have the original bare message, can
+ //send the entire content as is
+ encoded.resize(original->getSize());
+ ::memcpy(encoded.getData(), original->getData(), original->getSize());
+ }
+ } else {
HeaderAdapter header(msg);
+ PropertiesAdapter properties(msg, address.getSubject());
+ ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
+ //compute size:
+ size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
+ + qpid::amqp::MessageEncoder::getEncodedSize(properties)
+ + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
+ if (msg.getContent().isVoid()) {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
+ } else {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
+ }
+ encoded.resize(contentSize);
+ QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ //write header:
encoder.writeHeader(header);
- ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
- } else {
- //since as yet have no annotations, if the header hasn't
- //changed and we still have the original bare message, can
- //send the entire content as is
- encoded.resize(original->getSize());
- ::memcpy(encoded.getData(), original->getData(), original->getSize());
- }
- } else {
- HeaderAdapter header(msg);
- PropertiesAdapter properties(msg, address.getSubject());
- ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
- //compute size:
- size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
- + qpid::amqp::MessageEncoder::getEncodedSize(properties)
- + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
- if (msg.getContent().isVoid()) {
- contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
- } else {
- contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
- }
- encoded.resize(contentSize);
- QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
- qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
- //write header:
- encoder.writeHeader(header);
- //write delivery-annotations, write message-annotations (none yet supported)
- //write properties
- encoder.writeProperties(properties);
- //write application-properties
- encoder.writeApplicationProperties(applicationProperties);
- //write body
- if (!msg.getContent().isVoid()) {
- //write as AmqpValue
- encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
- } else if (msg.getBytes().size()) {
- encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
- }
- if (encoder.getPosition() < encoded.getSize()) {
- QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
- encoded.trim(encoder.getPosition());
+ //write delivery-annotations, write message-annotations (none yet supported)
+ //write properties
+ encoder.writeProperties(properties);
+ //write application-properties
+ encoder.writeApplicationProperties(applicationProperties);
+ //write body
+ if (!msg.getContent().isVoid()) {
+ //write as AmqpValue
+ encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
+ } else if (msg.getBytes().size()) {
+ encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ }
+ if (encoder.getPosition() < encoded.getSize()) {
+ QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
+ encoded.trim(encoder.getPosition());
+ }
+ //write footer (no annotations yet supported)
}
- //write footer (no annotations yet supported)
+ } catch (const qpid::Exception& e) {
+ throw SendError(e.what());
}
}
void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)