diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-13 15:06:42 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-13 15:06:42 +0000 |
| commit | 144f3c698bdddf22509691a4f285305e9fd83291 (patch) | |
| tree | 2a399e0c06586c0ab5cab8986eb317767ac4080d /qpid/cpp | |
| parent | 4c5376ff911c42a9e43115d38c6751573d8af514 (diff) | |
| download | qpid-python-144f3c698bdddf22509691a4f285305e9fd83291.tar.gz | |
QPID-5040: support for sending and receiving messages with AmqpValue sections
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1513536 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
38 files changed, 880 insertions, 521 deletions
diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp index 563e5e5060..fe4ea7950d 100644 --- a/qpid/cpp/examples/messaging/drain.cpp +++ b/qpid/cpp/examples/messaging/drain.cpp @@ -90,13 +90,11 @@ int main(int argc, char** argv) int count = options.getCount(); Message message; int i = 0; - + while (receiver.fetch(message, timeout)) { std::cout << "Message(properties=" << message.getProperties() << ", content='" ; if (message.getContentType() == "amqp/map") { - Variant::Map map; - decode(message, map); - std::cout << map; + std::cout << message.getContentObject().asMap(); } else { std::cout << message.getContent(); } @@ -106,7 +104,7 @@ int main(int argc, char** argv) break; } receiver.close(); - session.close(); + session.close(); connection.close(); return 0; } catch(const std::exception& error) { @@ -114,5 +112,5 @@ int main(int argc, char** argv) connection.close(); } } - return 1; + return 1; } diff --git a/qpid/cpp/examples/messaging/map_receiver.cpp b/qpid/cpp/examples/messaging/map_receiver.cpp index 081f7394a8..96bc76b821 100644 --- a/qpid/cpp/examples/messaging/map_receiver.cpp +++ b/qpid/cpp/examples/messaging/map_receiver.cpp @@ -39,15 +39,13 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; const char* address = argc>2 ? argv[2] : "message_queue; {create: always}"; std::string connectionOptions = argc > 3 ? argv[3] : ""; - + Connection connection(url, connectionOptions); try { connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(address); - Variant::Map content; - decode(receiver.fetch(), content); - std::cout << content << std::endl; + std::cout << receiver.fetch().getContentObject() << std::endl; session.acknowledge(); receiver.close(); connection.close(); @@ -56,5 +54,5 @@ int main(int argc, char** argv) { std::cout << error.what() << std::endl; connection.close(); } - return 1; + return 1; } diff --git a/qpid/cpp/examples/messaging/map_sender.cpp b/qpid/cpp/examples/messaging/map_sender.cpp index 8ce3e1d8ec..81ac7320d8 100644 --- a/qpid/cpp/examples/messaging/map_sender.cpp +++ b/qpid/cpp/examples/messaging/map_sender.cpp @@ -39,7 +39,7 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; const char* address = argc>2 ? argv[2] : "message_queue; {create: always}"; std::string connectionOptions = argc > 3 ? argv[3] : ""; - + Connection connection(url, connectionOptions); try { connection.open(); @@ -57,8 +57,8 @@ int main(int argc, char** argv) { colours.push_back(Variant("white")); content["colours"] = colours; content["uuid"] = Uuid(true); - encode(content, message); - + message.setContentObject(content); + sender.send(message, true); connection.close(); diff --git a/qpid/cpp/examples/messaging/spout.cpp b/qpid/cpp/examples/messaging/spout.cpp index 72fcdc7c65..e80b434b3e 100644 --- a/qpid/cpp/examples/messaging/spout.cpp +++ b/qpid/cpp/examples/messaging/spout.cpp @@ -145,7 +145,7 @@ int main(int argc, char** argv) if (options.entries.size()) { Variant::Map content; options.setEntries(content); - encode(content, message); + message.getContentObject() = content; } else if (options.content.size()) { message.setContent(options.content); message.setContentType("text/plain"); diff --git a/qpid/cpp/include/qpid/messaging/Message.h b/qpid/cpp/include/qpid/messaging/Message.h index 5b14c7cf27..10569eb006 100644 --- a/qpid/cpp/include/qpid/messaging/Message.h +++ b/qpid/cpp/include/qpid/messaging/Message.h @@ -42,6 +42,7 @@ class MessageImpl; class QPID_MESSAGING_CLASS_EXTERN Message { public: + QPID_MESSAGING_EXTERN Message(qpid::types::Variant&); QPID_MESSAGING_EXTERN Message(const std::string& bytes = std::string()); QPID_MESSAGING_EXTERN Message(const char*, size_t); QPID_MESSAGING_EXTERN Message(const Message&); @@ -164,6 +165,27 @@ class QPID_MESSAGING_CLASS_EXTERN Message /** Get the content as a std::string */ QPID_MESSAGING_EXTERN std::string getContent() const; + /** Get the content as raw bytes (an alias for getContent() */ + QPID_MESSAGING_EXTERN std::string getContentBytes() const; + /** Set the content as raw bytes (an alias for setContent() */ + QPID_MESSAGING_EXTERN void setContentBytes(const std::string&); + /** + * Get the content as a Variant, which can represent an object of + * different types. This can be used for content representing a + * map or a list for example. + */ + QPID_MESSAGING_EXTERN qpid::types::Variant& getContentObject(); + /** + * Get the content as a Variant, which can represent an object of + * different types. This can be used for content representing a + * map or a list for example. + */ + QPID_MESSAGING_EXTERN const qpid::types::Variant& getContentObject() const; + /** + * Set the content using a Variant, which can represent an object + * of different types. + */ + QPID_MESSAGING_EXTERN void setContentObject(const qpid::types::Variant&); /** * Get a const pointer to the start of the content data. The * memory pointed to is owned by the message. The getContentSize() diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 081fce5220..3c77c4e33f 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1026,12 +1026,16 @@ set (qpidcommon_SOURCES qpid/amqp_0_10/Codecs.cpp qpid/amqp/CharSequence.h qpid/amqp/CharSequence.cpp + qpid/amqp/DataBuilder.h + qpid/amqp/DataBuilder.cpp qpid/amqp/Decoder.h qpid/amqp/Decoder.cpp qpid/amqp/Descriptor.h qpid/amqp/Descriptor.cpp qpid/amqp/Encoder.h qpid/amqp/Encoder.cpp + qpid/amqp/ListBuilder.h + qpid/amqp/ListBuilder.cpp qpid/amqp/MapHandler.h qpid/amqp/MapEncoder.h qpid/amqp/MapEncoder.cpp diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.cpp b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp new file mode 100644 index 0000000000..805125eb7f --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp @@ -0,0 +1,194 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataBuilder.h" +#include "CharSequence.h" +#include "qpid/log/Statement.h" +#include "qpid/types/encodings.h" + +namespace qpid { +namespace amqp { + +void DataBuilder::onNull(const Descriptor*) +{ + handle(qpid::types::Variant()); +} +void DataBuilder::onBoolean(bool v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUByte(uint8_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUShort(uint16_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUInt(uint32_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onULong(uint64_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onByte(int8_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onShort(int16_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onInt(int32_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onLong(int64_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onFloat(float v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onDouble(double v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUuid(const CharSequence& v, const Descriptor*) +{ + if (v.size == qpid::types::Uuid::SIZE) { + handle(qpid::types::Uuid(v.data)); + } +} +void DataBuilder::onTimestamp(int64_t v, const Descriptor*) +{ + handle(v); +} + +void DataBuilder::handle(const qpid::types::Variant& v) +{ + switch (nested.top()->getType()) { + case qpid::types::VAR_MAP: + nested.push(&nested.top()->asMap()[v.asString()]); + break; + case qpid::types::VAR_LIST: + nested.top()->asList().push_back(v); + break; + default: + *(nested.top()) = v; + nested.pop(); + break; + } +} + +void DataBuilder::onBinary(const CharSequence& v, const Descriptor*) +{ + onString(std::string(v.data, v.size), qpid::types::encodings::BINARY); +} +void DataBuilder::onString(const CharSequence& v, const Descriptor*) +{ + onString(std::string(v.data, v.size), qpid::types::encodings::UTF8); +} +void DataBuilder::onSymbol(const CharSequence& v, const Descriptor*) +{ + onString(std::string(v.data, v.size), qpid::types::encodings::ASCII); +} + +void DataBuilder::onString(const std::string& value, const std::string& encoding) +{ + switch (nested.top()->getType()) { + case qpid::types::VAR_MAP: + nested.push(&nested.top()->asMap()[value]); + break; + case qpid::types::VAR_LIST: + nested.top()->asList().push_back(qpid::types::Variant(value)); + nested.top()->asList().back().setEncoding(encoding); + break; + default: + qpid::types::Variant& v = *(nested.top()); + v = value; + v.setEncoding(encoding); + nested.pop(); + break; + } +} + +bool DataBuilder::proceed() +{ + return !nested.empty(); +} + +bool DataBuilder::nest(const qpid::types::Variant& n) +{ + switch (nested.top()->getType()) { + case qpid::types::VAR_MAP: + QPID_LOG(error, QPID_MSG("Expecting map key; got " << n)); + break; + case qpid::types::VAR_LIST: + nested.top()->asList().push_back(n); + nested.push(&nested.top()->asList().back()); + break; + default: + qpid::types::Variant& value = *(nested.top()); + value = n; + nested.pop(); + nested.push(&value); + break; + } + return true; +} + +bool DataBuilder::onStartList(uint32_t, const CharSequence&, const Descriptor*) +{ + return nest(qpid::types::Variant::List()); +} +void DataBuilder::onEndList(uint32_t /*count*/, const Descriptor*) +{ + nested.pop(); +} +bool DataBuilder::onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*) +{ + return nest(qpid::types::Variant::Map()); +} +void DataBuilder::onEndMap(uint32_t /*count*/, const Descriptor*) +{ + nested.pop(); +} +bool DataBuilder::onStartArray(uint32_t count, const CharSequence&, const Constructor&, const Descriptor*) +{ + return onStartList(count, CharSequence::create(), 0); +} +void DataBuilder::onEndArray(uint32_t count, const Descriptor*) +{ + onEndList(count, 0); +} +qpid::types::Variant& DataBuilder::getValue() +{ + return base; +} +DataBuilder::DataBuilder(qpid::types::Variant v) : base(v) +{ + nested.push(&base); +} +DataBuilder::~DataBuilder() {} +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.h b/qpid/cpp/src/qpid/amqp/DataBuilder.h new file mode 100644 index 0000000000..672584d73d --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/DataBuilder.h @@ -0,0 +1,78 @@ +#ifndef QPID_AMQP_DATABUILDER_H +#define QPID_AMQP_DATABUILDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" +#include "qpid/types/Variant.h" +#include <stack> + +namespace qpid { +namespace amqp { + +/** + * Utility to build a Variant based structure (or value) from a data stream + */ +class DataBuilder : public Reader +{ + public: + DataBuilder(qpid::types::Variant); + virtual ~DataBuilder(); + void onNull(const Descriptor*); + void onBoolean(bool, const Descriptor*); + void onUByte(uint8_t, const Descriptor*); + void onUShort(uint16_t, const Descriptor*); + void onUInt(uint32_t, const Descriptor*); + void onULong(uint64_t, const Descriptor*); + void onByte(int8_t, const Descriptor*); + void onShort(int16_t, const Descriptor*); + void onInt(int32_t, const Descriptor*); + void onLong(int64_t, const Descriptor*); + void onFloat(float, const Descriptor*); + void onDouble(double, const Descriptor*); + void onUuid(const CharSequence&, const Descriptor*); + void onTimestamp(int64_t, const Descriptor*); + + void onBinary(const CharSequence&, const Descriptor*); + void onString(const CharSequence&, const Descriptor*); + void onSymbol(const CharSequence&, const Descriptor*); + + bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*); + bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*); + bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); + void onEndList(uint32_t /*count*/, const Descriptor*); + void onEndMap(uint32_t /*count*/, const Descriptor*); + void onEndArray(uint32_t /*count*/, const Descriptor*); + + bool proceed(); + qpid::types::Variant& getValue(); + private: + qpid::types::Variant base; + std::stack<qpid::types::Variant*> nested; + std::string key; + + void handle(const qpid::types::Variant& v); + bool nest(const qpid::types::Variant& v); + void onString(const std::string&, const std::string&); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DATABUILDER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Encoder.cpp b/qpid/cpp/src/qpid/amqp/Encoder.cpp index 549b6d1e4e..627cc4aed6 100644 --- a/qpid/cpp/src/qpid/amqp/Encoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Encoder.cpp @@ -23,11 +23,15 @@ #include "qpid/amqp/Descriptor.h" #include "qpid/amqp/typecodes.h" #include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" +#include "qpid/types/encodings.h" #include "qpid/log/Statement.h" #include "qpid/Exception.h" #include <assert.h> #include <string.h> +using namespace qpid::types::encodings; + namespace qpid { namespace amqp { @@ -373,6 +377,87 @@ void Encoder::endArray32(size_t count, void* token) end<uint32_t>(count, token, data+position); } +void Encoder::writeMap(const std::map<std::string, qpid::types::Variant>& value, const Descriptor* d, bool large) +{ + void* token = large ? startMap32(d) : startMap8(d); + for (qpid::types::Variant::Map::const_iterator i = value.begin(); i != value.end(); ++i) { + writeString(i->first); + writeValue(i->second); + } + if (large) endMap32(value.size()*2, token); + else endMap8(value.size()*2, token); +} + +void Encoder::writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d, bool large) +{ + void* token = large ? startList32(d) : startList8(d); + for (qpid::types::Variant::List::const_iterator i = value.begin(); i != value.end(); ++i) { + writeValue(*i); + } + if (large) endList32(value.size(), token); + else endList8(value.size(), token); +} + +void Encoder::writeValue(const qpid::types::Variant& value, const Descriptor* d) +{ + switch (value.getType()) { + case qpid::types::VAR_VOID: + writeNull(d); + break; + case qpid::types::VAR_BOOL: + writeBoolean(value.asBool(), d); + break; + case qpid::types::VAR_UINT8: + writeUByte(value.asUint8(), d); + break; + case qpid::types::VAR_UINT16: + writeUShort(value.asUint16(), d); + break; + case qpid::types::VAR_UINT32: + writeUInt(value.asUint32(), d); + break; + case qpid::types::VAR_UINT64: + writeULong(value.asUint64(), d); + break; + case qpid::types::VAR_INT8: + writeByte(value.asInt8(), d); + break; + case qpid::types::VAR_INT16: + writeShort(value.asInt16(), d); + break; + case qpid::types::VAR_INT32: + writeInt(value.asInt32(), d); + break; + case qpid::types::VAR_INT64: + writeLong(value.asInt64(), d); + break; + case qpid::types::VAR_FLOAT: + writeFloat(value.asFloat(), d); + break; + case qpid::types::VAR_DOUBLE: + writeDouble(value.asDouble(), d); + break; + case qpid::types::VAR_STRING: + if (value.getEncoding() == UTF8) { + writeString(value.getString(), d); + } else if (value.getEncoding() == ASCII) { + writeSymbol(value.getString(), d); + } else { + writeBinary(value.getString(), d); + } + break; + case qpid::types::VAR_MAP: + writeMap(value.asMap(), d); + break; + case qpid::types::VAR_LIST: + writeList(value.asList(), d); + break; + case qpid::types::VAR_UUID: + writeUuid(value.asUuid(), d); + break; + } + +} void Encoder::writeDescriptor(const Descriptor& d) { diff --git a/qpid/cpp/src/qpid/amqp/Encoder.h b/qpid/cpp/src/qpid/amqp/Encoder.h index c661e4ac5d..d6c0054a14 100644 --- a/qpid/cpp/src/qpid/amqp/Encoder.h +++ b/qpid/cpp/src/qpid/amqp/Encoder.h @@ -23,12 +23,15 @@ */ #include "qpid/sys/IntegerTypes.h" #include "qpid/amqp/Constructor.h" +#include <list> +#include <map> #include <stddef.h> #include <string> namespace qpid { namespace types { class Uuid; +class Variant; } namespace amqp { struct CharSequence; @@ -91,6 +94,10 @@ class Encoder void endArray8(size_t count, void*); void endArray32(size_t count, void*); + void writeValue(const qpid::types::Variant&, const Descriptor* d=0); + void writeMap(const std::map<std::string, qpid::types::Variant>& value, const Descriptor* d=0, bool large=true); + void writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d=0, bool large=true); + void writeDescriptor(const Descriptor&); Encoder(char* data, size_t size); size_t getPosition(); diff --git a/qpid/cpp/src/qpid/amqp/ListBuilder.cpp b/qpid/cpp/src/qpid/amqp/ListBuilder.cpp new file mode 100644 index 0000000000..f2ca8e8805 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/ListBuilder.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ListBuilder.h" + +namespace qpid { +namespace amqp { + +ListBuilder::ListBuilder() : DataBuilder(qpid::types::Variant::List()) {} + +qpid::types::Variant::List& ListBuilder::getList() +{ + return getValue().asList(); +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/ListBuilder.h b/qpid/cpp/src/qpid/amqp/ListBuilder.h new file mode 100644 index 0000000000..ee6af62539 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/ListBuilder.h @@ -0,0 +1,40 @@ +#ifndef QPID_AMQP_LISTBUILDER_H +#define QPID_AMQP_LISTBUILDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataBuilder.h" + +namespace qpid { +namespace amqp { + +/** + * Utility to build a Variant::List from a data stream + */ +class ListBuilder : public DataBuilder +{ + public: + ListBuilder(); + qpid::types::Variant::List& getList(); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_LISTBUILDER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.cpp b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp index a554497791..ce8eea038e 100644 --- a/qpid/cpp/src/qpid/amqp/MapBuilder.cpp +++ b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp @@ -19,112 +19,12 @@ * */ #include "MapBuilder.h" -#include <assert.h> namespace qpid { namespace amqp { -namespace { -const std::string BINARY("binary"); -const std::string UTF8("utf8"); -const std::string ASCII("ascii"); -} - +MapBuilder::MapBuilder() : DataBuilder(qpid::types::Variant::Map()) {} qpid::types::Variant::Map MapBuilder::getMap() { - return map; -} -const qpid::types::Variant::Map MapBuilder::getMap() const -{ - return map; -} - -void MapBuilder::onNullValue(const CharSequence& key, const Descriptor*) -{ - map[std::string(key.data, key.size)] = qpid::types::Variant(); -} -void MapBuilder::onBooleanValue(const CharSequence& key, bool value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} -void MapBuilder::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onByteValue(const CharSequence& key, int8_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onShortValue(const CharSequence& key, int16_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onIntValue(const CharSequence& key, int32_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onLongValue(const CharSequence& key, int64_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onFloatValue(const CharSequence& key, float value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onDoubleValue(const CharSequence& key, double value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*) -{ - assert(value.size == 16); - map[std::string(key.data, key.size)] = qpid::types::Uuid(value.data); -} - -void MapBuilder::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*) -{ - map[std::string(key.data, key.size)] = value; -} - -void MapBuilder::onBinaryValue(const CharSequence& key, const CharSequence& value, const Descriptor*) -{ - qpid::types::Variant& v = map[std::string(key.data, key.size)]; - v = std::string(value.data, value.size); - v.setEncoding(BINARY); -} - -void MapBuilder::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*) -{ - qpid::types::Variant& v = map[std::string(key.data, key.size)]; - v = std::string(value.data, value.size); - v.setEncoding(UTF8); -} - -void MapBuilder::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*) -{ - qpid::types::Variant& v = map[std::string(key.data, key.size)]; - v = std::string(value.data, value.size); - v.setEncoding(ASCII); + return getValue().asMap(); } }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.h b/qpid/cpp/src/qpid/amqp/MapBuilder.h index 0e3b95f633..500d2e6db3 100644 --- a/qpid/cpp/src/qpid/amqp/MapBuilder.h +++ b/qpid/cpp/src/qpid/amqp/MapBuilder.h @@ -21,42 +21,19 @@ * under the License. * */ -#include "MapReader.h" -#include "qpid/types/Variant.h" +#include "DataBuilder.h" namespace qpid { namespace amqp { /** - * Utility to build a Variant::Map from a data stream (doesn't handle - * nested maps or lists yet) + * Utility to build a Variant::Map from a data stream */ -class MapBuilder : public MapReader +class MapBuilder : public DataBuilder { public: - void onNullValue(const CharSequence& /*key*/, const Descriptor*); - void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*); - void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*); - void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*); - void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*); - void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*); - void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*); - void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*); - void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*); - void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*); - void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*); - void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*); - void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); - void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*); - - void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); - void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); - void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); - + MapBuilder(); qpid::types::Variant::Map getMap(); - const qpid::types::Variant::Map getMap() const; - private: - qpid::types::Variant::Map map; }; }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp index 3b493d1de7..beaea2befd 100644 --- a/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp +++ b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp @@ -156,68 +156,6 @@ void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large); } -void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const Descriptor* d, bool large) -{ - void* token = large ? startMap32(d) : startMap8(d); - for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - writeString(i->first); - switch (i->second.getType()) { - case qpid::types::VAR_MAP: - case qpid::types::VAR_LIST: - //not allowed (TODO: revise, only strictly true for application-properties) whereas this is now a more general method) - QPID_LOG(warning, "Ignoring nested map/list; not allowed in application-properties for AMQP 1.0"); - case qpid::types::VAR_VOID: - writeNull(); - break; - case qpid::types::VAR_BOOL: - writeBoolean(i->second); - break; - case qpid::types::VAR_UINT8: - writeUByte(i->second); - break; - case qpid::types::VAR_UINT16: - writeUShort(i->second); - break; - case qpid::types::VAR_UINT32: - writeUInt(i->second); - break; - case qpid::types::VAR_UINT64: - writeULong(i->second); - break; - case qpid::types::VAR_INT8: - writeByte(i->second); - break; - case qpid::types::VAR_INT16: - writeShort(i->second); - break; - case qpid::types::VAR_INT32: - writeInt(i->second); - break; - case qpid::types::VAR_INT64: - writeULong(i->second); - break; - case qpid::types::VAR_FLOAT: - writeFloat(i->second); - break; - case qpid::types::VAR_DOUBLE: - writeDouble(i->second); - break; - case qpid::types::VAR_STRING: - if (i->second.getEncoding() == BINARY) { - writeBinary(i->second); - } else { - writeString(i->second); - } - break; - case qpid::types::VAR_UUID: - writeUuid(i->second); - break; - } - } - if (large) endMap32(properties.size()*2, token); - else endMap8(properties.size()*2, token); -} - size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) { return getEncodedSize(h) + getEncodedSize(p, ap, d); @@ -288,46 +226,56 @@ size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map { size_t total = 0; for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { - total += 1/*code*/ + encodedSize(i->first); - - switch (i->second.getType()) { - case qpid::types::VAR_MAP: - case qpid::types::VAR_LIST: - case qpid::types::VAR_VOID: - case qpid::types::VAR_BOOL: - total += 1; - break; - - case qpid::types::VAR_UINT8: - case qpid::types::VAR_INT8: - total += 2; - break; - - case qpid::types::VAR_UINT16: - case qpid::types::VAR_INT16: - total += 3; - break; - - case qpid::types::VAR_UINT32: - case qpid::types::VAR_INT32: - case qpid::types::VAR_FLOAT: - total += 5; - break; - - case qpid::types::VAR_UINT64: - case qpid::types::VAR_INT64: - case qpid::types::VAR_DOUBLE: - total += 9; - break; - - case qpid::types::VAR_UUID: - total += 17; - break; - - case qpid::types::VAR_STRING: - total += 1/*code*/ + encodedSize(i->second); - break; - } + total += 1/*code*/ + encodedSize(i->first) + getEncodedSizeForValue(i->second); + } + return total; +} + +size_t MessageEncoder::getEncodedSizeForValue(const qpid::types::Variant& value) +{ + size_t total = 0; + switch (value.getType()) { + case qpid::types::VAR_MAP: + total += getEncodedSize(value.asMap(), true); + break; + case qpid::types::VAR_LIST: + total += getEncodedSize(value.asList(), true); + break; + + case qpid::types::VAR_VOID: + case qpid::types::VAR_BOOL: + total += 1; + break; + + case qpid::types::VAR_UINT8: + case qpid::types::VAR_INT8: + total += 2; + break; + + case qpid::types::VAR_UINT16: + case qpid::types::VAR_INT16: + total += 3; + break; + + case qpid::types::VAR_UINT32: + case qpid::types::VAR_INT32: + case qpid::types::VAR_FLOAT: + total += 5; + break; + + case qpid::types::VAR_UINT64: + case qpid::types::VAR_INT64: + case qpid::types::VAR_DOUBLE: + total += 9; + break; + + case qpid::types::VAR_UUID: + total += 17; + break; + + case qpid::types::VAR_STRING: + total += 1/*code*/ + encodedSize(value.getString()); + break; } return total; } @@ -345,4 +293,20 @@ size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool return total; } + +size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::List& list, bool alwaysUseLargeList) +{ + size_t total(0); + for (qpid::types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + total += getEncodedSizeForValue(*i); + } + + //its not just the count that determines whether we can use a small list, but the aggregate size: + if (alwaysUseLargeList || list.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/; + else total += 1/*size*/ + 1/*count*/; + + total += 1 /*code for list itself*/; + + return total; +} }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MessageEncoder.h b/qpid/cpp/src/qpid/amqp/MessageEncoder.h index e9ab2c5f89..a9fb7366af 100644 --- a/qpid/cpp/src/qpid/amqp/MessageEncoder.h +++ b/qpid/cpp/src/qpid/amqp/MessageEncoder.h @@ -91,22 +91,26 @@ class MessageEncoder : public Encoder void writeApplicationProperties(const qpid::types::Variant::Map& properties); void writeApplicationProperties(const qpid::types::Variant::Map& properties, bool useLargeMap); - void writeMap(const qpid::types::Variant::Map& map, const Descriptor*, bool useLargeMap); - static size_t getEncodedSize(const Header&); static size_t getEncodedSize(const Properties&); static size_t getEncodedSize(const ApplicationProperties&); - static size_t getEncodedSize(const Header&, const Properties&, const ApplicationProperties&, const std::string&); + static size_t getEncodedSize(const qpid::types::Variant::List&, bool useLargeList); static size_t getEncodedSize(const qpid::types::Variant::Map&, bool useLargeMap); - static size_t getEncodedSize(const qpid::types::Variant::Map&); - static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&); + + static size_t getEncodedSizeForValue(const qpid::types::Variant& value); + static size_t getEncodedSizeForContent(const std::string&); + + //used in translating 0-10 content to 1.0, to determine buffer space needed static size_t getEncodedSize(const Properties&, const qpid::types::Variant::Map&, const std::string&); + private: bool optimise; + static size_t getEncodedSize(const Header&, const Properties&, const ApplicationProperties&, const std::string&); + static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&); + static size_t getEncodedSizeForElements(const qpid::types::Variant::Map&); - static size_t getEncodedSizeForContent(const std::string&); }; }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.cpp b/qpid/cpp/src/qpid/amqp/MessageReader.cpp index 7a428bf4a9..726ee087d1 100644 --- a/qpid/cpp/src/qpid/amqp/MessageReader.cpp +++ b/qpid/cpp/src/qpid/amqp/MessageReader.cpp @@ -21,6 +21,7 @@ #include "qpid/amqp/MessageReader.h" #include "qpid/amqp/Descriptor.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp/typecodes.h" #include "qpid/types/Uuid.h" #include "qpid/types/Variant.h" #include "qpid/log/Statement.h" @@ -55,40 +56,6 @@ const size_t REPLY_TO_GROUP_ID(12); } -/* -Reader& MessageReader::HeaderReader::getReader(size_t index) -{ - switch (index) { - case DURABLE: return durableReader; - case PRIORITY: return priorityReader; - case TTL: return ttlReader; - case FIRST_ACQUIRER: return firstAcquirerReader; - case DELIVERY_COUNT: return deliveryCountReader; - default: return noSuchFieldReader; - } -} - -Reader& MessageReader::PropertiesReader::getReader(size_t index) -{ - switch (index) { - case MESSAGE_ID: return messageIdReader; - case USER_ID: return userIdReader; - case TO: return toReader; - case SUBJECT: return subjectReader; - case REPLY_TO: return replyToReader; - case CORRELATION_ID: return correlationIdReader; - case CONTENT_TYPE: return contentTypeReader; - case CONTENT_ENCODING: return contentEncodingReader; - case ABSOLUTE_EXPIRY_TIME: return absoluteExpiryTimeReader; - case CREATION_TIME: return creationTimeReader; - case GROUP_ID: return groupIdReader; - case GROUP_SEQUENCE: return groupSequenceReader; - case REPLY_TO_GROUP_ID: return replyToGroupIdReader; - default: return noSuchFieldReader; - } -} -*/ - MessageReader::HeaderReader::HeaderReader(MessageReader& p) : parent(p), index(0) {} void MessageReader::HeaderReader::onBoolean(bool v, const Descriptor*) // durable, first-acquirer { @@ -234,8 +201,11 @@ bool MessageReader::onStartList(uint32_t count, const CharSequence& raw, const D } else if (descriptor->match(PROPERTIES_SYMBOL, PROPERTIES_CODE)) { delegate = &propertiesReader; return true; - } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onBody(raw, *descriptor); + } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE)) { + onAmqpSequence(raw); + return false; + } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(raw, qpid::amqp::typecodes::LIST_NAME); return false; } else { QPID_LOG(warning, "Unexpected described list: " << *descriptor); @@ -276,7 +246,7 @@ bool MessageReader::onStartMap(uint32_t count, const CharSequence& raw, const De onApplicationProperties(raw); return false; } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onBody(raw, *descriptor); + onAmqpValue(raw, qpid::amqp::typecodes::MAP_NAME); return false; } else { QPID_LOG(warning, "Unexpected described map: " << *descriptor); @@ -300,8 +270,10 @@ void MessageReader::onBinary(const CharSequence& bytes, const Descriptor* descri } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got binary value with no descriptor."); - } else if (descriptor->match(DATA_SYMBOL, DATA_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onBody(bytes, *descriptor); + } else if (descriptor->match(DATA_SYMBOL, DATA_CODE)) { + onData(bytes); + } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(bytes, qpid::amqp::typecodes::BINARY_NAME); } else { QPID_LOG(warning, "Unexpected binary value with descriptor: " << *descriptor); } @@ -317,7 +289,7 @@ void MessageReader::onNull(const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant v; - onBody(v, *descriptor); + onAmqpValue(v); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got null value with no descriptor."); @@ -333,7 +305,7 @@ void MessageReader::onString(const CharSequence& v, const Descriptor* descriptor delegate->onString(v, descriptor); } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onBody(v, *descriptor); + onAmqpValue(v, qpid::amqp::typecodes::STRING_NAME); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got string value with no descriptor."); @@ -349,7 +321,7 @@ void MessageReader::onSymbol(const CharSequence& v, const Descriptor* descriptor delegate->onSymbol(v, descriptor); } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onBody(v, *descriptor); + onAmqpValue(v, qpid::amqp::typecodes::SYMBOL_NAME); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got symbol value with no descriptor."); @@ -367,7 +339,7 @@ void MessageReader::onBoolean(bool v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got boolean value with no descriptor."); @@ -385,7 +357,7 @@ void MessageReader::onUByte(uint8_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got ubyte value with no descriptor."); @@ -403,7 +375,7 @@ void MessageReader::onUShort(uint16_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got ushort value with no descriptor."); @@ -421,7 +393,7 @@ void MessageReader::onUInt(uint32_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got uint value with no descriptor."); @@ -439,7 +411,7 @@ void MessageReader::onULong(uint64_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got ulong value with no descriptor."); @@ -457,7 +429,7 @@ void MessageReader::onByte(int8_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got byte value with no descriptor."); @@ -475,7 +447,7 @@ void MessageReader::onShort(int16_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got short value with no descriptor."); @@ -493,7 +465,7 @@ void MessageReader::onInt(int32_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got int value with no descriptor."); @@ -511,7 +483,7 @@ void MessageReader::onLong(int64_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got long value with no descriptor."); @@ -529,7 +501,7 @@ void MessageReader::onFloat(float v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got float value with no descriptor."); @@ -547,7 +519,7 @@ void MessageReader::onDouble(double v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got double value with no descriptor."); @@ -564,7 +536,7 @@ void MessageReader::onUuid(const CharSequence& v, const Descriptor* descriptor) delegate->onUuid(v, descriptor); } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onBody(v, *descriptor); + onAmqpValue(v, qpid::amqp::typecodes::UUID_NAME); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got uuid value with no descriptor."); @@ -582,7 +554,7 @@ void MessageReader::onTimestamp(int64_t v, const Descriptor* descriptor) } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { qpid::types::Variant body = v; - onBody(body, *descriptor); + onAmqpValue(body); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got timestamp value with no descriptor."); @@ -599,7 +571,8 @@ bool MessageReader::onStartArray(uint32_t count, const CharSequence& raw, const return delegate->onStartArray(count, raw, constructor, descriptor); } else { if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { - onBody(raw, *descriptor); + //TODO: might be better to decode this here + onAmqpValue(raw, qpid::amqp::typecodes::ARRAY_NAME); } else { if (!descriptor) { QPID_LOG(warning, "Expected described type but got array with no descriptor."); diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.h b/qpid/cpp/src/qpid/amqp/MessageReader.h index 5d26b288f5..2dc112a28a 100644 --- a/qpid/cpp/src/qpid/amqp/MessageReader.h +++ b/qpid/cpp/src/qpid/amqp/MessageReader.h @@ -98,172 +98,21 @@ class MessageReader : public Reader virtual void onApplicationProperties(const CharSequence&) = 0; virtual void onDeliveryAnnotations(const CharSequence&) = 0; virtual void onMessageAnnotations(const CharSequence&) = 0; - virtual void onBody(const CharSequence&, const Descriptor&) = 0; - virtual void onBody(const qpid::types::Variant&, const Descriptor&) = 0; + + virtual void onData(const CharSequence&) = 0; + virtual void onAmqpSequence(const CharSequence&) = 0; + virtual void onAmqpValue(const CharSequence&, const std::string& type) = 0; + virtual void onAmqpValue(const qpid::types::Variant&) = 0; + virtual void onFooter(const CharSequence&) = 0; QPID_COMMON_EXTERN CharSequence getBareMessage() const; private: - /* - class DurableReader : public Reader - { - public: - DurableReader(MessageReader&); - void onBoolean(bool v, const Descriptor*); - private: - MessageReader& parent; - }; - class PriorityReader : public Reader - { - public: - PriorityReader(MessageReader&); - void onUByte(uint8_t v, const Descriptor*); - private: - MessageReader& parent; - }; - class TtlReader : public Reader - { - public: - TtlReader(MessageReader&); - void onUInt(uint32_t v, const Descriptor*); - private: - MessageReader& parent; - }; - class FirstAcquirerReader : public Reader - { - public: - FirstAcquirerReader(MessageReader&); - void onBoolean(bool v, const Descriptor*); - private: - MessageReader& parent; - }; - class DeliveryCountReader : public Reader - { - public: - DeliveryCountReader(MessageReader&); - void onUInt(uint32_t v, const Descriptor*); - private: - MessageReader& parent; - }; - class MessageIdReader : public Reader - { - public: - MessageIdReader(MessageReader&); - void onUuid(const qpid::types::Uuid& v, const Descriptor*); - void onULong(uint64_t v, const Descriptor*); - void onString(const CharSequence& v, const Descriptor*); - void onBinary(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class UserIdReader : public Reader - { - public: - UserIdReader(MessageReader&); - void onBinary(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class ToReader : public Reader - { - public: - ToReader(MessageReader&); - void onString(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class SubjectReader : public Reader + class HeaderReader : public Reader { public: - SubjectReader(MessageReader&); - void onString(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class ReplyToReader : public Reader - { - public: - ReplyToReader(MessageReader&); - void onString(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class CorrelationIdReader : public Reader - { - public: - CorrelationIdReader(MessageReader&); - void onUuid(const qpid::types::Uuid& v, const Descriptor*); - void onULong(uint64_t v, const Descriptor*); - void onString(const CharSequence& v, const Descriptor*); - void onBinary(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class ContentTypeReader : public Reader - { - public: - ContentTypeReader(MessageReader&); - void onString(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class ContentEncodingReader : public Reader - { - public: - ContentEncodingReader(MessageReader&); - void onString(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class AbsoluteExpiryTimeReader : public Reader - { - public: - AbsoluteExpiryTimeReader(MessageReader&); - void onTimestamp(int64_t v, const Descriptor*); - private: - MessageReader& parent; - }; - class CreationTimeReader : public Reader - { - public: - CreationTimeReader(MessageReader&); - void onTimestamp(int64_t v, const Descriptor*); - private: - MessageReader& parent; - }; - class GroupIdReader : public Reader - { - public: - GroupIdReader(MessageReader&); - void onString(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - class GroupSequenceReader : public Reader - { - public: - GroupSequenceReader(MessageReader&); - void onUInt(uint32_t v, const Descriptor*); - private: - MessageReader& parent; - }; - class ReplyToGroupIdReader : public Reader - { - public: - ReplyToGroupIdReader(MessageReader&); - void onString(const CharSequence& v, const Descriptor*); - private: - MessageReader& parent; - }; - */ - - class HeaderReader : public Reader //public ListReader - { - public: - //Reader& getReader(size_t index); - HeaderReader(MessageReader&); void onBoolean(bool v, const Descriptor*); // durable, first-acquirer void onUByte(uint8_t v, const Descriptor*); // priority @@ -273,11 +122,9 @@ class MessageReader : public Reader MessageReader& parent; size_t index; }; - class PropertiesReader : public Reader //public ListReader + class PropertiesReader : public Reader { public: - //Reader& getReader(size_t index); - PropertiesReader(MessageReader&); void onUuid(const CharSequence& v, const Descriptor*); // message-id, correlation-id void onULong(uint64_t v, const Descriptor*); // message-id, correlation-id diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h index 248d6df2df..e395fc25d7 100644 --- a/qpid/cpp/src/qpid/amqp/descriptors.h +++ b/qpid/cpp/src/qpid/amqp/descriptors.h @@ -52,6 +52,7 @@ const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE); const Descriptor MESSAGE_ANNOTATIONS(MESSAGE_ANNOTATIONS_CODE); const Descriptor PROPERTIES(PROPERTIES_CODE); const Descriptor APPLICATION_PROPERTIES(APPLICATION_PROPERTIES_CODE); +const Descriptor AMQP_VALUE(AMQP_VALUE_CODE); const Descriptor DATA(DATA_CODE); } diff --git a/qpid/cpp/src/qpid/amqp/typecodes.h b/qpid/cpp/src/qpid/amqp/typecodes.h index 3c6bd17b97..915b75ca3f 100644 --- a/qpid/cpp/src/qpid/amqp/typecodes.h +++ b/qpid/cpp/src/qpid/amqp/typecodes.h @@ -83,7 +83,7 @@ const uint8_t ARRAY32(0xf0); const std::string NULL_NAME("null"); -const std::string BOOLEAN_NAME("name"); +const std::string BOOLEAN_NAME("bool"); const std::string UBYTE_NAME("ubyte"); const std::string USHORT_NAME("ushort"); diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp index 3dcf1c14e6..5ea9664ea8 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp @@ -21,9 +21,13 @@ #include "Message.h" #include "qpid/amqp/Decoder.h" #include "qpid/amqp/descriptors.h" -#include "qpid/amqp/Reader.h" -#include "qpid/amqp/MessageEncoder.h" +#include "qpid/amqp/ListBuilder.h" +#include "qpid/amqp/MapBuilder.h" #include "qpid/amqp/MapHandler.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/amqp/Reader.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/encodings.h" #include "qpid/log/Statement.h" #include "qpid/framing/Buffer.h" #include <string.h> @@ -256,10 +260,52 @@ void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {} void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; } void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; } void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; } -void Message::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { body = v; } -void Message::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {} + +void Message::onData(const qpid::amqp::CharSequence& v) { body = v; } +void Message::onAmqpSequence(const qpid::amqp::CharSequence& v) { body = v; bodyType = qpid::amqp::typecodes::LIST_NAME; } +void Message::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& t) +{ + body = v; + if (t == qpid::amqp::typecodes::STRING_NAME) { + bodyType = qpid::types::encodings::UTF8; + } else if (t == qpid::amqp::typecodes::SYMBOL_NAME) { + bodyType = qpid::types::encodings::ASCII; + } else if (t == qpid::amqp::typecodes::BINARY_NAME) { + bodyType = qpid::types::encodings::BINARY; + } else { + bodyType = t; + } +} +void Message::onAmqpValue(const qpid::types::Variant& v) { typedBody = v; } + void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; } +bool Message::isTypedBody() const +{ + return !typedBody.isVoid() || !bodyType.empty(); +} + +qpid::types::Variant Message::getTypedBody() const +{ + if (bodyType == qpid::amqp::typecodes::LIST_NAME) { + qpid::amqp::ListBuilder builder; + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + return builder.getList(); + } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) { + qpid::amqp::MapBuilder builder; + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + return builder.getMap(); + } else if (!bodyType.empty()) { + qpid::types::Variant value(std::string(body.data, body.size)); + value.setEncoding(bodyType); + return value; + } else { + return typedBody; + } +} + //PersistableMessage interface: void Message::encode(framing::Buffer& buffer) const diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index cbf8669fc1..c7e2df84aa 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -64,6 +64,8 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess qpid::amqp::CharSequence getBareMessage() const; qpid::amqp::CharSequence getBody() const; qpid::amqp::CharSequence getFooter() const; + bool isTypedBody() const; + qpid::types::Variant getTypedBody() const; Message(size_t size); char* getData(); @@ -109,6 +111,8 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess //body: qpid::amqp::CharSequence body; + qpid::types::Variant typedBody; + std::string bodyType; //footer: qpid::amqp::CharSequence footer; @@ -139,8 +143,12 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess void onApplicationProperties(const qpid::amqp::CharSequence&); void onDeliveryAnnotations(const qpid::amqp::CharSequence&); void onMessageAnnotations(const qpid::amqp::CharSequence&); - void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&); - void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&); + + void onData(const qpid::amqp::CharSequence&); + void onAmqpSequence(const qpid::amqp::CharSequence&); + void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type); + void onAmqpValue(const qpid::types::Variant&); + void onFooter(const qpid::amqp::CharSequence&); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 668b500570..366218c002 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -88,7 +88,7 @@ bool expired(const sys::AbsTime& start, double timeout) ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2), - retries(0), reconnectOnLimitExceeded(true) + retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false) { setOptions(options); urls.insert(urls.begin(), url); @@ -157,8 +157,10 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) settings.sslCertName = value.asString(); } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { reconnectOnLimitExceeded = value; - } else if (name == "client-properties") { + } else if (name == "client-properties" || name == "client_properties") { amqp_0_10::translate(value.asMap(), settings.clientProperties); + } else if (name == "disable-auto-decode" || name == "disable_auto_decode") { + disableAutoDecode = value; } else { throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); } @@ -346,4 +348,9 @@ std::string ConnectionImpl::getAuthenticatedUsername() return connection.getNegotiatedSettings().username; } +bool ConnectionImpl::getAutoDecode() const +{ + return !disableAutoDecode; +} + }}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index d1ac4533d5..00ac30a6df 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -53,6 +53,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl void setOption(const std::string& name, const qpid::types::Variant& value); bool backoff(); std::string getAuthenticatedUsername(); + bool getAutoDecode() const; private: typedef std::map<std::string, qpid::messaging::Session> Sessions; @@ -70,6 +71,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl double maxReconnectInterval; int32_t retries; bool reconnectOnLimitExceeded; + bool disableAutoDecode; void setOptions(const qpid::types::Variant::Map& options); void connect(const qpid::sys::AbsTime& started); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index db6e843cf6..27a2107702 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -399,7 +399,7 @@ void populate(qpid::messaging::Message& message, FrameSet& command) //need to be able to link the message back to the transfer it was delivered by //e.g. for rejecting. MessageImplAccess::get(message).setInternalId(command.getId()); - + message.setContent(command.getContent()); populateHeaders(message, command.getHeaders()); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index 348f9e160c..41b30be4fe 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -50,8 +50,20 @@ const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); void OutgoingMessage::convert(const qpid::messaging::Message& from) { //TODO: need to avoid copying as much as possible - message.setData(from.getContent()); - message.getMessageProperties().setContentType(from.getContentType()); + if (from.getContentObject().getType() == qpid::types::VAR_MAP) { + std::string content; + qpid::amqp_0_10::MapCodec::encode(from.getContentObject().asMap(), content); + message.getMessageProperties().setContentType(qpid::amqp_0_10::MapCodec::contentType); + message.setData(content); + } else if (from.getContentObject().getType() == qpid::types::VAR_LIST) { + std::string content; + qpid::amqp_0_10::ListCodec::encode(from.getContentObject().asList(), content); + message.getMessageProperties().setContentType(qpid::amqp_0_10::ListCodec::contentType); + message.setData(content); + } else { + message.setData(from.getContent()); + message.getMessageProperties().setContentType(from.getContentType()); + } if ( !from.getCorrelationId().empty() ) message.getMessageProperties().setCorrelationId(from.getCorrelationId()); message.getMessageProperties().setUserId(from.getUserId()); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 7e8de21247..c3c8b83dc5 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -25,6 +25,7 @@ #include "qpid/messaging/exceptions.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" +#include "qpid/amqp_0_10/Codecs.h" namespace qpid { namespace client { @@ -148,9 +149,9 @@ qpid::messaging::Address ReceiverImpl::getAddress() const } ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, - const qpid::messaging::Address& a) : + const qpid::messaging::Address& a, bool autoDecode_) : - parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), autoDecode(autoDecode_), state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -159,7 +160,20 @@ bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::D sys::Mutex::ScopedLock l(lock); if (state == CANCELLED) return false; } - return parent->get(*this, message, timeout); + if (parent->get(*this, message, timeout)) { + if (autoDecode) { + if (message.getContentType() == qpid::amqp_0_10::MapCodec::contentType) { + message.getContentObject() = qpid::types::Variant::Map(); + decode(message, message.getContentObject().asMap()); + } else if (message.getContentType() == qpid::amqp_0_10::ListCodec::contentType) { + message.getContentObject() = qpid::types::Variant::List(); + decode(message, message.getContentObject().asList()); + } + } + return true; + } else { + return false; + } } bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 4dba76c8d9..0d3366907b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -48,7 +48,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED}; ReceiverImpl(SessionImpl& parent, const std::string& name, - const qpid::messaging::Address& address); + const qpid::messaging::Address& address, bool autoDecode); void init(qpid::client::AsyncSession session, AddressResolution& resolver); bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout); @@ -74,6 +74,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl const std::string destination; const qpid::messaging::Address address; const uint32_t byteCredit; + const bool autoDecode; State state; std::auto_ptr<MessageSource> source; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index e4c2c6afb8..982bfa3503 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -207,7 +207,7 @@ Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address ScopedLock l(lock); std::string name = address.getName(); getFreeKey(name, receivers); - Receiver receiver(new ReceiverImpl(*this, name, address)); + Receiver receiver(new ReceiverImpl(*this, name, address, connection->getAutoDecode())); getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver); receivers[name] = receiver; return receiver; diff --git a/qpid/cpp/src/qpid/messaging/Message.cpp b/qpid/cpp/src/qpid/messaging/Message.cpp index 0f03bc8ca3..b40ae06cbc 100644 --- a/qpid/cpp/src/qpid/messaging/Message.cpp +++ b/qpid/cpp/src/qpid/messaging/Message.cpp @@ -31,6 +31,10 @@ using namespace qpid::types; Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {} Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {} +Message::Message(qpid::types::Variant& c) : impl(new MessageImpl(std::string())) +{ + setContentObject(c); +} Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {} Message::~Message() { delete impl; } @@ -75,6 +79,13 @@ void Message::setContent(const std::string& c) { impl->setBytes(c); } void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); } std::string Message::getContent() const { return impl->getBytes(); } +void Message::setContentBytes(const std::string& c) { impl->setBytes(c); } +std::string Message::getContentBytes() const { return impl->getBytes(); } + +qpid::types::Variant& Message::getContentObject() { return impl->getContent(); } +void Message::setContentObject(const qpid::types::Variant& c) { impl->getContent() = c; } +const qpid::types::Variant& Message::getContentObject() const { return impl->getContent(); } + const char* Message::getContentPtr() const { return impl->getBytes().data(); diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp index fc9bc5dfa1..7b5854745e 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,19 +30,21 @@ const std::string EMPTY_STRING = ""; using namespace qpid::types; -MessageImpl::MessageImpl(const std::string& c) : +MessageImpl::MessageImpl(const std::string& c) : priority(0), ttl(0), durable(false), redelivered(false), bytes(c), + contentDecoded(false), internalId(0) {} -MessageImpl::MessageImpl(const char* chars, size_t count) : +MessageImpl::MessageImpl(const char* chars, size_t count) : priority(0), ttl(0), durable (false), redelivered(false), bytes(chars, count), + contentDecoded(false), internalId(0) {} void MessageImpl::setReplyTo(const Address& d) @@ -167,21 +169,35 @@ void MessageImpl::setBytes(const char* chars, size_t count) bytes.assign(chars, count); updated(); } -void MessageImpl::appendBytes(const char* chars, size_t count) -{ - bytes.append(chars, count); - updated(); -} const std::string& MessageImpl::getBytes() const { - if (!bytes.size() && encoded) encoded->getBody(bytes); - return bytes; + if (encoded && !contentDecoded) { + encoded->getBody(bytes, content); + contentDecoded = true; + } + if (bytes.empty() && !content.isVoid()) return content.getString(); + else return bytes; } std::string& MessageImpl::getBytes() { - if (!bytes.size() && encoded) encoded->getBody(bytes); updated();//have to assume body may be edited, invalidating our message - return bytes; + if (bytes.empty() && !content.isVoid()) return content.getString(); + else return bytes; +} + +qpid::types::Variant& MessageImpl::getContent() +{ + updated();//have to assume content may be edited, invalidating our message + return content; +} + +const qpid::types::Variant& MessageImpl::getContent() const +{ + if (encoded && !contentDecoded) { + encoded->getBody(bytes, content); + contentDecoded = true; + } + return content; } void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; } @@ -197,7 +213,10 @@ void MessageImpl::updated() if (!userId.size() && encoded) encoded->getUserId(userId); if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId); if (!headers.size() && encoded) encoded->populate(headers); - if (!bytes.size() && encoded) encoded->getBody(bytes); + if (encoded && !contentDecoded) { + encoded->getBody(bytes, content); + contentDecoded = true; + } encoded.reset(); } @@ -210,5 +229,4 @@ const MessageImpl& MessageImplAccess::get(const Message& msg) { return *msg.impl; } - }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h index 915c790153..6387daafb7 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.h +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,6 +47,8 @@ class MessageImpl mutable qpid::types::Variant::Map headers; mutable std::string bytes; + mutable qpid::types::Variant content; + mutable bool contentDecoded; boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded; qpid::framing::SequenceNumber internalId; @@ -87,9 +89,10 @@ class MessageImpl void setBytes(const std::string& bytes); void setBytes(const char* chars, size_t count); - void appendBytes(const char* chars, size_t count); const std::string& getBytes() const; std::string& getBytes(); + qpid::types::Variant& getContent(); + const qpid::types::Variant& getContent() const; void setInternalId(qpid::framing::SequenceNumber id); qpid::framing::SequenceNumber getInternalId(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp index b4e0819980..a0bf8dc575 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -22,13 +22,18 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/amqp/Decoder.h" +#include "qpid/amqp/DataBuilder.h" +#include "qpid/amqp/ListBuilder.h" +#include "qpid/amqp/MapBuilder.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" #include <boost/lexical_cast.hpp> #include <string.h> namespace qpid { namespace messaging { namespace amqp { - using namespace qpid::amqp; EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0) @@ -178,9 +183,39 @@ void EncodedMessage::getCorrelationId(std::string& s) const { correlationId.assign(s); } -void EncodedMessage::getBody(std::string& s) const +void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const { - s.assign(body.data, body.size); + //TODO: based on section type, populate content + if (!content.isVoid()) { + c = content;//integer types, floats, bool etc + //TODO: populate raw data? + } else { + if (bodyType.empty() + || bodyType == qpid::amqp::typecodes::BINARY_NAME + || bodyType == qpid::amqp::typecodes::STRING_NAME + || bodyType == qpid::amqp::typecodes::SYMBOL_NAME) + { + c = std::string(body.data, body.size); + c.setEncoding(bodyType); + } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) { + qpid::amqp::ListBuilder builder; + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + c = builder.getList(); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) { + qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map()); + qpid::amqp::Decoder decoder(body.data, body.size); + decoder.read(builder); + c = builder.getValue().asMap(); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) { + if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data); + raw.assign(body.data, body.size); + } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) { + raw.assign(body.data, body.size); + } + } } qpid::amqp::CharSequence EncodedMessage::getBody() const @@ -216,6 +251,7 @@ bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) c } + EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m) { //set up defaults as needed: @@ -252,12 +288,32 @@ void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequenc void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; } void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; } void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; } -void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) + +void EncodedMessage::InitialScan::onData(const qpid::amqp::CharSequence& v) +{ + em.body = v; +} +void EncodedMessage::InitialScan::onAmqpSequence(const qpid::amqp::CharSequence& v) { - //TODO: how to communicate the type, i.e. descriptor? em.body = v; + em.bodyType = qpid::amqp::typecodes::LIST_NAME; } -void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {} +void EncodedMessage::InitialScan::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& type) +{ + em.body = v; + if (type == qpid::amqp::typecodes::STRING_NAME) { + em.bodyType = qpid::types::encodings::UTF8; + } else if (type == qpid::amqp::typecodes::SYMBOL_NAME) { + em.bodyType = qpid::types::encodings::ASCII; + } else { + em.bodyType = type; + } +} +void EncodedMessage::InitialScan::onAmqpValue(const qpid::types::Variant& v) +{ + em.content = v; +} + void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; } }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h index 09a9d948d5..233b718dec 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h @@ -89,13 +89,14 @@ class EncodedMessage void getMessageId(std::string&) const; void getUserId(std::string&) const; void getCorrelationId(std::string&) const; + void populate(qpid::types::Variant::Map&) const; + void getBody(std::string&, qpid::types::Variant&) const; void init(qpid::messaging::MessageImpl&); - void populate(qpid::types::Variant::Map&) const; - void getBody(std::string&) const; qpid::amqp::CharSequence getBareMessage() const; qpid::amqp::CharSequence getBody() const; bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const; + private: size_t size; char* data; @@ -130,8 +131,12 @@ class EncodedMessage void onApplicationProperties(const qpid::amqp::CharSequence&); void onDeliveryAnnotations(const qpid::amqp::CharSequence&); void onMessageAnnotations(const qpid::amqp::CharSequence&); - void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&); - void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&); + + void onData(const qpid::amqp::CharSequence&); + void onAmqpSequence(const qpid::amqp::CharSequence&); + void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type); + void onAmqpValue(const qpid::types::Variant&); + void onFooter(const qpid::amqp::CharSequence&); private: EncodedMessage& em; @@ -164,7 +169,11 @@ class EncodedMessage qpid::amqp::CharSequence replyToGroupId; //application-properties: qpid::amqp::CharSequence applicationProperties; + //application data: qpid::amqp::CharSequence body; + std::string bodyType; + qpid::types::Variant content; + //footer: qpid::amqp::CharSequence footer; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 6e3a432a59..1043a0c62c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -440,7 +440,15 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co PropertiesAdapter properties(msg, address.getSubject()); ApplicationPropertiesAdapter applicationProperties(msg.getHeaders()); //compute size: - encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, applicationProperties, msg.getBytes())); + size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header) + + qpid::amqp::MessageEncoder::getEncodedSize(properties) + + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties); + if (msg.getContent().isVoid()) { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes()); + } else { + contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/; + } + encoded.resize(contentSize); QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); //write header: @@ -451,7 +459,12 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co //write application-properties encoder.writeApplicationProperties(applicationProperties); //write body - if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported + if (!msg.getContent().isVoid()) { + //write as AmqpValue + encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE); + } else if (msg.getBytes().size()) { + encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported + } if (encoder.getPosition() < encoded.getSize()) { QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition()); encoded.trim(encoder.getPosition()); diff --git a/qpid/cpp/src/qpid/types/encodings.h b/qpid/cpp/src/qpid/types/encodings.h new file mode 100644 index 0000000000..827b6964b9 --- /dev/null +++ b/qpid/cpp/src/qpid/types/encodings.h @@ -0,0 +1,33 @@ +#ifndef QPID_TYPES_ENCODINGS_H +#define QPID_TYPES_ENCODINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace types { +namespace encodings { +const std::string BINARY("binary"); +const std::string UTF8("utf8"); +const std::string ASCII("ascii"); +} +}} // namespace qpid::types + +#endif /*!QPID_TYPES_ENCODINGS_H*/ diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index d43598f551..d00b828ddc 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -232,8 +232,10 @@ int main(int argc, char ** argv) std::cout << "Properties: " << msg.getProperties() << std::endl; std::cout << std::endl; } - if (opts.printContent) - std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages + if (opts.printContent) { + if (!msg.getContentObject().isVoid()) std::cout << msg.getContentObject() << std::endl; + else std::cout << msg.getContent() << std::endl; + } if (opts.messages && count >= opts.messages) done = true; } } diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index 72ccf1466d..efc6f95448 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -262,9 +262,8 @@ class MapContentGenerator : public ContentGenerator { public: MapContentGenerator(const Options& opt) : opts(opt) {} virtual bool setContent(Message& msg) { - Variant::Map map; - opts.setEntries(map); - encode(map, msg); + msg.getContentObject() = qpid::types::Variant::Map(); + opts.setEntries(msg.getContentObject().asMap()); return true; } private: |
