diff options
| author | Gordon Sim <gsim@apache.org> | 2013-09-02 16:22:24 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-09-02 16:22:24 +0000 |
| commit | 4082b715ac94781de6c5af5245e9e3031ed8a8a0 (patch) | |
| tree | 1e39eeee956cf4fda1d2611a5500bce972b1501c /qpid/cpp/src | |
| parent | 54cdb4dcada8cfeb23d756e4980e701ebb382c13 (diff) | |
| download | qpid-python-4082b715ac94781de6c5af5245e9e3031ed8a8a0.tar.gz | |
QPID-5106: handle annotations properly and add option to control whether annotations are nested or not on fetch()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1519466 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
26 files changed, 137 insertions, 199 deletions
diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.cpp b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp index 805125eb7f..91c9393457 100644 --- a/qpid/cpp/src/qpid/amqp/DataBuilder.cpp +++ b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp @@ -158,7 +158,7 @@ bool DataBuilder::nest(const qpid::types::Variant& n) return true; } -bool DataBuilder::onStartList(uint32_t, const CharSequence&, const Descriptor*) +bool DataBuilder::onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) { return nest(qpid::types::Variant::List()); } @@ -166,7 +166,7 @@ void DataBuilder::onEndList(uint32_t /*count*/, const Descriptor*) { nested.pop(); } -bool DataBuilder::onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*) +bool DataBuilder::onStartMap(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*) { return nest(qpid::types::Variant::Map()); } @@ -176,7 +176,7 @@ void DataBuilder::onEndMap(uint32_t /*count*/, const Descriptor*) } bool DataBuilder::onStartArray(uint32_t count, const CharSequence&, const Constructor&, const Descriptor*) { - return onStartList(count, CharSequence::create(), 0); + return onStartList(count, CharSequence::create(), CharSequence::create(), 0); } void DataBuilder::onEndArray(uint32_t count, const Descriptor*) { diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.h b/qpid/cpp/src/qpid/amqp/DataBuilder.h index 9876a625b1..51ee3da5f8 100644 --- a/qpid/cpp/src/qpid/amqp/DataBuilder.h +++ b/qpid/cpp/src/qpid/amqp/DataBuilder.h @@ -56,8 +56,8 @@ class DataBuilder : public Reader QPID_COMMON_EXTERN void onString(const CharSequence&, const Descriptor*); QPID_COMMON_EXTERN void onSymbol(const CharSequence&, const Descriptor*); - QPID_COMMON_EXTERN bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*); - QPID_COMMON_EXTERN bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartList(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartMap(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); QPID_COMMON_EXTERN bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); QPID_COMMON_EXTERN void onEndList(uint32_t /*count*/, const Descriptor*); QPID_COMMON_EXTERN void onEndMap(uint32_t /*count*/, const Descriptor*); diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp index 1d5abc99c7..1058f83e38 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp @@ -22,6 +22,7 @@ #include "qpid/amqp/CharSequence.h" #include "qpid/amqp/Constructor.h" #include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/MapBuilder.h" #include "qpid/amqp/Reader.h" #include "qpid/amqp/typecodes.h" #include "qpid/types/Uuid.h" @@ -34,121 +35,13 @@ namespace amqp { using namespace qpid::amqp::typecodes; -Decoder::Decoder(const char* d, size_t s) : start(d), size(s), position(0) {} +Decoder::Decoder(const char* d, size_t s) : start(d), size(s), position(0), current(0) {} -namespace { -class MapBuilder : public Reader -{ - public: - void onNull(const Descriptor*) - { - qpid::types::Variant v; - handle(v, NULL_NAME); - } - void onBoolean(bool v, const Descriptor*) - { - handle(v, BOOLEAN_NAME); - } - void onUByte(uint8_t v, const Descriptor*) - { - handle(v, UBYTE_NAME); - } - void onUShort(uint16_t v, const Descriptor*) - { - handle(v, USHORT_NAME); - } - void onUInt(uint32_t v, const Descriptor*) - { - handle(v, UINT_NAME); - } - void onULong(uint64_t v, const Descriptor*) - { - handle(v, ULONG_NAME); - } - void onByte(int8_t v, const Descriptor*) - { - handle(v, BYTE_NAME); - } - void onShort(int16_t v, const Descriptor*) - { - handle(v, SHORT_NAME); - } - void onInt(int32_t v, const Descriptor*) - { - handle(v, INT_NAME); - } - void onLong(int64_t v, const Descriptor*) - { - handle(v, LONG_NAME); - } - void onFloat(float v, const Descriptor*) - { - handle(v, FLOAT_NAME); - } - void onDouble(double v, const Descriptor*) - { - handle(v, DOUBLE_NAME); - } - void onUuid(const CharSequence& v, const Descriptor*) - { - handle(v, UUID_NAME); - } - void onTimestamp(int64_t v, const Descriptor*) - { - handle(v, TIMESTAMP_NAME); - } - void onBinary(const CharSequence& v, const Descriptor*) - { - handle(v); - } - void onString(const CharSequence& v, const Descriptor*) - { - handle(v); - } - void onSymbol(const CharSequence& v, const Descriptor*) - { - handle(v); - } - MapBuilder(qpid::types::Variant::Map& m) : map(m), state(KEY) {} - private: - qpid::types::Variant::Map& map; - enum {KEY, SKIP, VALUE} state; - std::string key; - - template <typename T> void handle(T value, const std::string& name) - { - switch (state) { - case KEY: - QPID_LOG(warning, "Ignoring key of type " << name); - state = SKIP; - break; - case VALUE: - map[key] = value; - case SKIP: - state = KEY; - break; - } - } - void handle(const CharSequence& value) - { - switch (state) { - case KEY: - key = value.str(); - state = VALUE; - break; - case VALUE: - map[key] = value.str(); - case SKIP: - state = KEY; - break; - } - } -}; -} void Decoder::readMap(qpid::types::Variant::Map& map) { - MapBuilder builder(map); + MapBuilder builder; read(builder); + map = builder.getMap(); } qpid::types::Variant::Map Decoder::readMap() @@ -168,6 +61,7 @@ void Decoder::read(Reader& reader) void Decoder::readOne(Reader& reader) { const char* temp = start + position; + current = position; Constructor c = readConstructor(); if (c.isDescribed) reader.onDescriptor(c.descriptor, temp); readValue(reader, c.code, c.isDescribed ? &c.descriptor : 0); @@ -263,7 +157,7 @@ void Decoder::readValue(Reader& reader, uint8_t code, const Descriptor* descript break; case LIST0: - reader.onStartList(0, CharSequence::create(), descriptor); + reader.onStartList(0, CharSequence::create(), getCurrent(0), descriptor); reader.onEndList(0, descriptor); break; case LIST8: @@ -333,7 +227,7 @@ void Decoder::readArray32(Reader& reader, const Descriptor* descriptor) void Decoder::readList(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) { - if (reader.onStartList(count, CharSequence::create(data(), size), descriptor)) { + if (reader.onStartList(count, CharSequence::create(data(), size), getCurrent(size), descriptor)) { for (uint32_t i = 0; i < count; ++i) { readOne(reader); } @@ -345,7 +239,7 @@ void Decoder::readList(Reader& reader, uint32_t size, uint32_t count, const Desc } void Decoder::readMap(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) { - if (reader.onStartMap(count, CharSequence::create(data(), size), descriptor)) { + if (reader.onStartMap(count, CharSequence::create(data(), size), getCurrent(size), descriptor)) { for (uint32_t i = 0; i < count; ++i) { readOne(reader); } @@ -401,7 +295,7 @@ Descriptor Decoder::readDescriptor() case ULONG_ZERO: return Descriptor((uint64_t) 0); default: - throw qpid::Exception(QPID_MSG("Expected descriptor of type ulong or symbol; found " << code)); + throw qpid::Exception(QPID_MSG("Expected descriptor of type ulong or symbol; found " << (int)code)); } } @@ -542,4 +436,10 @@ CharSequence Decoder::readRawUuid() size_t Decoder::getPosition() const { return position; } size_t Decoder::getSize() const { return size; } void Decoder::resetSize(size_t s) { size = s; } + +CharSequence Decoder::getCurrent(size_t remaining) const +{ + return CharSequence::create(start + current, (position-current)+remaining); +} + }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Decoder.h b/qpid/cpp/src/qpid/amqp/Decoder.h index 7ddfe0f17f..a78518be2b 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.h +++ b/qpid/cpp/src/qpid/amqp/Decoder.h @@ -77,6 +77,7 @@ class Decoder const char* const start; size_t size; size_t position; + size_t current; void readOne(Reader& reader); void readValue(Reader& reader, uint8_t code, const Descriptor* descriptor); @@ -92,7 +93,7 @@ class Decoder CharSequence readRawUuid(); Constructor readConstructor(); const char* data(); - + CharSequence getCurrent(size_t remaining) const; }; }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.cpp b/qpid/cpp/src/qpid/amqp/Descriptor.cpp index 1d2df01e6e..384eba15b0 100644 --- a/qpid/cpp/src/qpid/amqp/Descriptor.cpp +++ b/qpid/cpp/src/qpid/amqp/Descriptor.cpp @@ -59,7 +59,7 @@ std::ostream& operator<<(std::ostream& os, const Descriptor& d) else os << "null"; break; case Descriptor::NUMERIC: - os << d.value.code; + os << "0x" << std::hex << d.value.code; break; } return os; diff --git a/qpid/cpp/src/qpid/amqp/ListReader.h b/qpid/cpp/src/qpid/amqp/ListReader.h index dce874bf2f..fafe2a1f9c 100644 --- a/qpid/cpp/src/qpid/amqp/ListReader.h +++ b/qpid/cpp/src/qpid/amqp/ListReader.h @@ -53,10 +53,10 @@ class ListReader : public Reader virtual void onString(const CharSequence& v, const Descriptor* descriptor) { getReader().onString(v, descriptor); } virtual void onSymbol(const CharSequence& v, const Descriptor* descriptor) { getReader().onSymbol(v, descriptor); } - virtual bool onStartList(uint32_t count, const CharSequence& v, const Descriptor* descriptor) + virtual bool onStartList(uint32_t count, const CharSequence& elements, const CharSequence& all, const Descriptor* descriptor) { ++level; - getReader().onStartList(count, v, descriptor); + getReader().onStartList(count, elements, all, descriptor); return false; } virtual void onEndList(uint32_t count, const Descriptor* descriptor) @@ -64,10 +64,10 @@ class ListReader : public Reader --level; getReader().onEndList(count, descriptor); } - virtual bool onStartMap(uint32_t count, const CharSequence& v, const Descriptor* descriptor) + virtual bool onStartMap(uint32_t count, const CharSequence& elements, const CharSequence& all, const Descriptor* descriptor) { ++level; - getReader().onStartMap(count, v, descriptor); + getReader().onStartMap(count, elements, all, descriptor); return false; } virtual void onEndMap(uint32_t count, const Descriptor* descriptor) diff --git a/qpid/cpp/src/qpid/amqp/MapReader.cpp b/qpid/cpp/src/qpid/amqp/MapReader.cpp index aff885c5d3..b6c31849f0 100644 --- a/qpid/cpp/src/qpid/amqp/MapReader.cpp +++ b/qpid/cpp/src/qpid/amqp/MapReader.cpp @@ -219,7 +219,7 @@ void MapReader::onSymbol(const CharSequence& v, const Descriptor* d) } } -bool MapReader::onStartList(uint32_t count, const CharSequence&, const Descriptor* d) +bool MapReader::onStartList(uint32_t count, const CharSequence&, const CharSequence&, const Descriptor* d) { if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { @@ -232,7 +232,7 @@ bool MapReader::onStartList(uint32_t count, const CharSequence&, const Descripto return true; } -bool MapReader::onStartMap(uint32_t count, const CharSequence&, const Descriptor* d) +bool MapReader::onStartMap(uint32_t count, const CharSequence&, const CharSequence&, const Descriptor* d) { if (level++) { if (key) { diff --git a/qpid/cpp/src/qpid/amqp/MapReader.h b/qpid/cpp/src/qpid/amqp/MapReader.h index db39d25895..875f919d63 100644 --- a/qpid/cpp/src/qpid/amqp/MapReader.h +++ b/qpid/cpp/src/qpid/amqp/MapReader.h @@ -87,8 +87,8 @@ class MapReader : public Reader QPID_COMMON_EXTERN void onString(const CharSequence&, const Descriptor*); QPID_COMMON_EXTERN void onSymbol(const CharSequence&, const Descriptor*); - QPID_COMMON_EXTERN bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*); - QPID_COMMON_EXTERN bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartList(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartMap(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); QPID_COMMON_EXTERN bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); QPID_COMMON_EXTERN void onEndList(uint32_t /*count*/, const Descriptor*); QPID_COMMON_EXTERN void onEndMap(uint32_t /*count*/, const Descriptor*); diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.cpp b/qpid/cpp/src/qpid/amqp/MessageReader.cpp index 726ee087d1..42faf9d08b 100644 --- a/qpid/cpp/src/qpid/amqp/MessageReader.cpp +++ b/qpid/cpp/src/qpid/amqp/MessageReader.cpp @@ -187,10 +187,10 @@ void MessageReader::PropertiesReader::onNull(const Descriptor*) } //header, properties, amqp-sequence, amqp-value -bool MessageReader::onStartList(uint32_t count, const CharSequence& raw, const Descriptor* descriptor) +bool MessageReader::onStartList(uint32_t count, const CharSequence& elements, const CharSequence& raw, const Descriptor* descriptor) { if (delegate) { - return delegate->onStartList(count, raw, descriptor); + return delegate->onStartList(count, elements, raw, descriptor); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got no descriptor for list."); @@ -205,7 +205,7 @@ bool MessageReader::onStartList(uint32_t count, const CharSequence& raw, const D onAmqpSequence(raw); return false; } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onAmqpValue(raw, qpid::amqp::typecodes::LIST_NAME); + onAmqpValue(elements, qpid::amqp::typecodes::LIST_NAME); return false; } else { QPID_LOG(warning, "Unexpected described list: " << *descriptor); @@ -225,28 +225,28 @@ void MessageReader::onEndList(uint32_t count, const Descriptor* descriptor) } //delivery-annotations, message-annotations, application-properties, amqp-value -bool MessageReader::onStartMap(uint32_t count, const CharSequence& raw, const Descriptor* descriptor) +bool MessageReader::onStartMap(uint32_t count, const CharSequence& elements, const CharSequence& raw, const Descriptor* descriptor) { if (delegate) { - return delegate->onStartMap(count, raw, descriptor); + return delegate->onStartMap(count, elements, raw, descriptor); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got no descriptor for map."); return false; } else if (descriptor->match(DELIVERY_ANNOTATIONS_SYMBOL, DELIVERY_ANNOTATIONS_CODE)) { - onDeliveryAnnotations(raw); + onDeliveryAnnotations(elements, raw); return false; } else if (descriptor->match(MESSAGE_ANNOTATIONS_SYMBOL, MESSAGE_ANNOTATIONS_CODE)) { - onMessageAnnotations(raw); + onMessageAnnotations(elements, raw); return false; } else if (descriptor->match(FOOTER_SYMBOL, FOOTER_CODE)) { - onFooter(raw); + onFooter(elements, raw); return false; } else if (descriptor->match(APPLICATION_PROPERTIES_SYMBOL, APPLICATION_PROPERTIES_CODE)) { - onApplicationProperties(raw); + onApplicationProperties(elements, raw); return false; } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onAmqpValue(raw, qpid::amqp::typecodes::MAP_NAME); + onAmqpValue(elements, qpid::amqp::typecodes::MAP_NAME); return false; } else { QPID_LOG(warning, "Unexpected described map: " << *descriptor); diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.h b/qpid/cpp/src/qpid/amqp/MessageReader.h index 2dc112a28a..d4627d2aa5 100644 --- a/qpid/cpp/src/qpid/amqp/MessageReader.h +++ b/qpid/cpp/src/qpid/amqp/MessageReader.h @@ -24,7 +24,6 @@ #include "qpid/amqp/CharSequence.h" #include "qpid/amqp/Reader.h" -#include "qpid/amqp/ListReader.h" #include "qpid/types/Variant.h" #include "qpid/CommonImportExport.h" @@ -40,11 +39,11 @@ class MessageReader : public Reader QPID_COMMON_EXTERN MessageReader(); //header, properties, amqp-sequence, amqp-value - QPID_COMMON_EXTERN bool onStartList(uint32_t, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*); QPID_COMMON_EXTERN void onEndList(uint32_t, const Descriptor*); //delivery-annotations, message-annotations, application-headers, amqp-value - QPID_COMMON_EXTERN bool onStartMap(uint32_t, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartMap(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*); QPID_COMMON_EXTERN void onEndMap(uint32_t, const Descriptor*); //data, amqp-value @@ -95,16 +94,16 @@ class MessageReader : public Reader virtual void onGroupSequence(uint32_t) = 0; virtual void onReplyToGroupId(const CharSequence&) = 0; - virtual void onApplicationProperties(const CharSequence&) = 0; - virtual void onDeliveryAnnotations(const CharSequence&) = 0; - virtual void onMessageAnnotations(const CharSequence&) = 0; + virtual void onApplicationProperties(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; + virtual void onDeliveryAnnotations(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; + virtual void onMessageAnnotations(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; virtual void onData(const CharSequence&) = 0; virtual void onAmqpSequence(const CharSequence&) = 0; virtual void onAmqpValue(const CharSequence&, const std::string& type) = 0; virtual void onAmqpValue(const qpid::types::Variant&) = 0; - virtual void onFooter(const CharSequence&) = 0; + virtual void onFooter(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; QPID_COMMON_EXTERN CharSequence getBareMessage() const; diff --git a/qpid/cpp/src/qpid/amqp/Reader.h b/qpid/cpp/src/qpid/amqp/Reader.h index 64019d1521..32f33dc33f 100644 --- a/qpid/cpp/src/qpid/amqp/Reader.h +++ b/qpid/cpp/src/qpid/amqp/Reader.h @@ -63,8 +63,8 @@ class Reader * @return true to get elements of the compound value, false * to skip over it */ - virtual bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*) { return true; } - virtual bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*) { return true; } + virtual bool onStartList(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) { return true; } + virtual bool onStartMap(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) { return true; } virtual bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*) { return true; } virtual void onEndList(uint32_t /*count*/, const Descriptor*) {} virtual void onEndMap(uint32_t /*count*/, const Descriptor*) {} diff --git a/qpid/cpp/src/qpid/amqp/SaslClient.cpp b/qpid/cpp/src/qpid/amqp/SaslClient.cpp index 69660e9294..d8a38750c5 100644 --- a/qpid/cpp/src/qpid/amqp/SaslClient.cpp +++ b/qpid/cpp/src/qpid/amqp/SaslClient.cpp @@ -116,7 +116,7 @@ class SaslOutcomeReader : public Reader }; } -bool SaslClient::onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor) +bool SaslClient::onStartList(uint32_t count, const CharSequence& arguments, const CharSequence& /*full raw data*/, const Descriptor* descriptor) { if (!descriptor) { QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor"); diff --git a/qpid/cpp/src/qpid/amqp/SaslClient.h b/qpid/cpp/src/qpid/amqp/SaslClient.h index e10504c610..d22887de1a 100644 --- a/qpid/cpp/src/qpid/amqp/SaslClient.h +++ b/qpid/cpp/src/qpid/amqp/SaslClient.h @@ -46,7 +46,7 @@ class SaslClient : public Sasl QPID_COMMON_EXTERN void response(const std::string*); private: - QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor); + QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const CharSequence&, const Descriptor* descriptor); }; diff --git a/qpid/cpp/src/qpid/amqp/SaslServer.cpp b/qpid/cpp/src/qpid/amqp/SaslServer.cpp index 403730ad69..250858bda0 100644 --- a/qpid/cpp/src/qpid/amqp/SaslServer.cpp +++ b/qpid/cpp/src/qpid/amqp/SaslServer.cpp @@ -158,7 +158,7 @@ class SaslResponseReader : public Reader }; } -bool SaslServer::onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor) +bool SaslServer::onStartList(uint32_t count, const CharSequence& arguments, const CharSequence& /*full raw data*/, const Descriptor* descriptor) { if (!descriptor) { QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor"); diff --git a/qpid/cpp/src/qpid/amqp/SaslServer.h b/qpid/cpp/src/qpid/amqp/SaslServer.h index 125bd0278e..68d0854488 100644 --- a/qpid/cpp/src/qpid/amqp/SaslServer.h +++ b/qpid/cpp/src/qpid/amqp/SaslServer.h @@ -43,7 +43,7 @@ class SaslServer : public Sasl QPID_COMMON_EXTERN void completed(bool succeeded); private: - QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor); + QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const CharSequence&, const Descriptor* descriptor); }; }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp index 957134d0e6..ca3e6daabd 100644 --- a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp @@ -160,7 +160,7 @@ void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /* void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) { size_t count = pn_data_get_list(data); - bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), descriptor); + bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), qpid::amqp::CharSequence(), descriptor); if (!skip) { pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { @@ -174,7 +174,7 @@ void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descrip void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) { size_t count = pn_data_get_map(data); - reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor); + reader.onStartMap(count, qpid::amqp::CharSequence(), qpid::amqp::CharSequence(), descriptor); pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { read(data); diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp index 5ea9664ea8..12e559cb88 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp @@ -125,8 +125,8 @@ namespace { } } - bool onStartList(uint32_t, const CharSequence&, const Descriptor*) { return false; } - bool onStartMap(uint32_t, const CharSequence&, const Descriptor*) { return false; } + bool onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) { return false; } + bool onStartMap(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) { return false; } bool onStartArray(uint32_t, const CharSequence&, const Constructor&, const Descriptor*) { return false; } public: @@ -257,9 +257,9 @@ void Message::onGroupId(const qpid::amqp::CharSequence&) {} void Message::onGroupSequence(uint32_t) {} void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {} -void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; } -void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; } -void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; } +void Message::onApplicationProperties(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { applicationProperties = v; } +void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; } +void Message::onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { messageAnnotations = v; } void Message::onData(const qpid::amqp::CharSequence& v) { body = v; } void Message::onAmqpSequence(const qpid::amqp::CharSequence& v) { body = v; bodyType = qpid::amqp::typecodes::LIST_NAME; } @@ -278,7 +278,7 @@ void Message::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& } void Message::onAmqpValue(const qpid::types::Variant& v) { typedBody = v; } -void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; } +void Message::onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { footer = v; } bool Message::isTypedBody() const { @@ -338,31 +338,41 @@ void Message::decodeHeader(framing::Buffer& buffer) } void Message::decodeContent(framing::Buffer& /*buffer*/) {} -boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& annotations) const +boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& added) const { //message- or delivery- annotations? would have to determine that from the name, for now assume always message-annotations - size_t extra = 0; + std::map<std::string, qpid::types::Variant> combined; + const std::map<std::string, qpid::types::Variant>* annotations(0); if (messageAnnotations) { - //TODO: actual merge required + //combine existing and added annotations (TODO: this could be + //optimised by avoiding the decode and simply 'editing' the + //size and count in the raw data, then appending the new + //elements). + qpid::amqp::MapBuilder builder; + qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); + decoder.read(builder); + combined = builder.getMap(); + for (std::map<std::string, qpid::types::Variant>::const_iterator i = added.begin(); i != added.end(); ++i) { + combined[i->first] = i->second; + } + annotations = &combined; } else { - //add whole new section - extra = qpid::amqp::MessageEncoder::getEncodedSize(annotations, true); + //additions form a whole new section + annotations = &added; } - boost::intrusive_ptr<Message> copy(new Message(data.size()+extra)); + size_t annotationsSize = qpid::amqp::MessageEncoder::getEncodedSize(*annotations, true) + 3/*descriptor*/; + + boost::intrusive_ptr<Message> copy(new Message(bareMessage.size+footer.size+deliveryAnnotations.size+annotationsSize)); size_t position(0); - if (deliveryAnnotations) { + if (deliveryAnnotations.size) { ::memcpy(©->data[position], deliveryAnnotations.data, deliveryAnnotations.size); position += deliveryAnnotations.size; } - if (messageAnnotations) { - //TODO: actual merge required - ::memcpy(©->data[position], messageAnnotations.data, messageAnnotations.size); - position += messageAnnotations.size; - } else { - qpid::amqp::MessageEncoder encoder(©->data[position], extra); - encoder.writeMap(annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true); - position += extra; - } + + qpid::amqp::Encoder encoder(©->data[position], annotationsSize); + encoder.writeMap(*annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true); + position += encoder.getPosition(); + if (bareMessage) { ::memcpy(©->data[position], bareMessage.data, bareMessage.size); position += bareMessage.size; @@ -371,7 +381,18 @@ boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::stri ::memcpy(©->data[position], footer.data, footer.size); position += footer.size; } + copy->data.resize(position);//annotationsSize may be slightly bigger than needed if optimisations are used (e.g. smallint) copy->scan(); + { + qpid::amqp::MapBuilder builder; + qpid::amqp::Decoder decoder(copy->messageAnnotations.data, copy->messageAnnotations.size); + decoder.read(builder); + QPID_LOG(notice, "Merged annotations are now: " << builder.getMap() << " raw=" << std::hex << std::string(copy->messageAnnotations.data, copy->messageAnnotations.size) << " " << copy->messageAnnotations.size << " bytes"); + } + assert(copy->messageAnnotations); + assert(copy->bareMessage.size == bareMessage.size); + assert(copy->footer.size == footer.size); + assert(copy->deliveryAnnotations.size == deliveryAnnotations.size); return copy; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index c7e2df84aa..3a7c4529de 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -140,16 +140,16 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess void onGroupSequence(uint32_t); void onReplyToGroupId(const qpid::amqp::CharSequence&); - void onApplicationProperties(const qpid::amqp::CharSequence&); - void onDeliveryAnnotations(const qpid::amqp::CharSequence&); - void onMessageAnnotations(const qpid::amqp::CharSequence&); + void onApplicationProperties(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + void onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + void onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); void onData(const qpid::amqp::CharSequence&); void onAmqpSequence(const qpid::amqp::CharSequence&); void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type); void onAmqpValue(const qpid::types::Variant&); - void onFooter(const qpid::amqp::CharSequence&); + void onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index 188738287e..cf85c1f05a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -273,10 +273,11 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation void Translation::write(OutgoingFromQueue& out) { - const Message* message = dynamic_cast<const Message*>(&original.getEncoding()); + const Message* message = dynamic_cast<const Message*>(original.getPersistentContext().get()); + //persistent context will contain any newly added annotations + if (!message) message = dynamic_cast<const Message*>(&original.getEncoding()); if (message) { //write annotations - //TODO: merge in any newly added annotations qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations(); qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations(); if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size); diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp index 746666a79c..6304263844 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp @@ -115,6 +115,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant& reconnectOnLimitExceeded = value; } else if (name == "container-id" || name == "container_id") { identifier = value.asString(); + } else if (name == "nest-annotations" || name == "nest_annotations") { + nestAnnotations = value; } else { throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); } diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h index 085615d5d4..5942592d78 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h @@ -45,6 +45,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings int32_t retries; bool reconnectOnLimitExceeded; std::string identifier; + bool nestAnnotations; QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&); QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 0d2640eb26..0d4885c4c3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -203,6 +203,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared if (current) { qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message); boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current))); + encoded->setNestAnnotationsOption(nestAnnotations); ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize()); if (read < 0) throw qpid::messaging::MessagingException("Failed to read message"); encoded->trim((size_t) read); diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp index 09a5ea4904..266060c117 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -36,17 +36,17 @@ namespace messaging { namespace amqp { using namespace qpid::amqp; -EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0) +EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0), nestAnnotations(false) { init(); } -EncodedMessage::EncodedMessage() : size(0), data(0) +EncodedMessage::EncodedMessage() : size(0), data(0), nestAnnotations(false) { init(); } -EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0) +EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0), nestAnnotations(false) { init(); } @@ -110,6 +110,8 @@ void EncodedMessage::init(qpid::messaging::MessageImpl& impl) } } +void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; } + void EncodedMessage::populate(qpid::types::Variant::Map& map) const { //decode application properties @@ -144,14 +146,20 @@ void EncodedMessage::populate(qpid::types::Variant::Map& map) const } //add in any annotations if (deliveryAnnotations) { - qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap(); qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size); - decoder.readMap(annotations); + if (nestAnnotations) { + map["x-amqp-delivery-annotations"] = decoder.readMap(); + } else { + decoder.readMap(map); + } } if (messageAnnotations) { - qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap(); qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); - decoder.readMap(annotations); + if (nestAnnotations) { + map["x-amqp-message-annotations"] = decoder.readMap(); + } else { + decoder.readMap(map); + } } } qpid::amqp::CharSequence EncodedMessage::getBareMessage() const @@ -284,9 +292,9 @@ void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; } void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; } -void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; } -void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; } -void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; } +void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.applicationProperties = v; } +void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.deliveryAnnotations = v; } +void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.messageAnnotations = v; } void EncodedMessage::InitialScan::onData(const qpid::amqp::CharSequence& v) { @@ -313,6 +321,6 @@ void EncodedMessage::InitialScan::onAmqpValue(const qpid::types::Variant& v) em.content = v; } -void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; } +void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.footer = v; } }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h index 90ac09e735..16a43aecea 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h @@ -86,6 +86,7 @@ class EncodedMessage QPID_MESSAGING_EXTERN void trim(size_t); QPID_MESSAGING_EXTERN void resize(size_t); + QPID_MESSAGING_EXTERN void setNestAnnotationsOption(bool); void getReplyTo(qpid::messaging::Address&) const; void getSubject(std::string&) const; void getContentType(std::string&) const; @@ -102,6 +103,7 @@ class EncodedMessage private: size_t size; char* data; + bool nestAnnotations; class InitialScan : public qpid::amqp::MessageReader { @@ -130,16 +132,16 @@ class EncodedMessage void onGroupSequence(uint32_t); void onReplyToGroupId(const qpid::amqp::CharSequence&); - void onApplicationProperties(const qpid::amqp::CharSequence&); - void onDeliveryAnnotations(const qpid::amqp::CharSequence&); - void onMessageAnnotations(const qpid::amqp::CharSequence&); + void onApplicationProperties(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + void onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); + void onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); void onData(const qpid::amqp::CharSequence&); void onAmqpSequence(const qpid::amqp::CharSequence&); void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type); void onAmqpValue(const qpid::types::Variant&); - void onFooter(const qpid::amqp::CharSequence&); + void onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&); private: EncodedMessage& em; qpid::messaging::MessageImpl& mi; diff --git a/qpid/cpp/src/tests/failing-amqp1.0-python-tests b/qpid/cpp/src/tests/failing-amqp1.0-python-tests index e69de29bb2..2cae7b6306 100644 --- a/qpid/cpp/src/tests/failing-amqp1.0-python-tests +++ b/qpid/cpp/src/tests/failing-amqp1.0-python-tests @@ -0,0 +1,2 @@ +qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange_2_consumers +qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests index 2ad17ebc33..6f862ffa2d 100755 --- a/qpid/cpp/src/tests/swig_python_tests +++ b/qpid/cpp/src/tests/swig_python_tests @@ -53,7 +53,7 @@ export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG $QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1 if [[ -a $AMQPC_LIB ]] ; then export QPID_LOAD_MODULE=$AMQPC_LIB - $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 + $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 fi stop_broker if [[ $FAILED -eq 1 ]]; then |
