summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-13 15:06:42 +0000
committerGordon Sim <gsim@apache.org>2013-08-13 15:06:42 +0000
commit144f3c698bdddf22509691a4f285305e9fd83291 (patch)
tree2a399e0c06586c0ab5cab8986eb317767ac4080d /qpid/cpp
parent4c5376ff911c42a9e43115d38c6751573d8af514 (diff)
downloadqpid-python-144f3c698bdddf22509691a4f285305e9fd83291.tar.gz
QPID-5040: support for sending and receiving messages with AmqpValue sections
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1513536 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/examples/messaging/drain.cpp10
-rw-r--r--qpid/cpp/examples/messaging/map_receiver.cpp8
-rw-r--r--qpid/cpp/examples/messaging/map_sender.cpp6
-rw-r--r--qpid/cpp/examples/messaging/spout.cpp2
-rw-r--r--qpid/cpp/include/qpid/messaging/Message.h22
-rw-r--r--qpid/cpp/src/CMakeLists.txt4
-rw-r--r--qpid/cpp/src/qpid/amqp/DataBuilder.cpp194
-rw-r--r--qpid/cpp/src/qpid/amqp/DataBuilder.h78
-rw-r--r--qpid/cpp/src/qpid/amqp/Encoder.cpp85
-rw-r--r--qpid/cpp/src/qpid/amqp/Encoder.h7
-rw-r--r--qpid/cpp/src/qpid/amqp/ListBuilder.cpp33
-rw-r--r--qpid/cpp/src/qpid/amqp/ListBuilder.h40
-rw-r--r--qpid/cpp/src/qpid/amqp/MapBuilder.cpp104
-rw-r--r--qpid/cpp/src/qpid/amqp/MapBuilder.h31
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageEncoder.cpp168
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageEncoder.h16
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageReader.cpp85
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageReader.h169
-rw-r--r--qpid/cpp/src/qpid/amqp/descriptors.h1
-rw-r--r--qpid/cpp/src/qpid/amqp/typecodes.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.cpp54
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.h12
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp11
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp16
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp20
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h3
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/Message.cpp11
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.cpp48
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.h9
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp68
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h17
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp17
-rw-r--r--qpid/cpp/src/qpid/types/encodings.h33
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp6
-rw-r--r--qpid/cpp/src/tests/qpid-send.cpp5
38 files changed, 880 insertions, 521 deletions
diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp
index 563e5e5060..fe4ea7950d 100644
--- a/qpid/cpp/examples/messaging/drain.cpp
+++ b/qpid/cpp/examples/messaging/drain.cpp
@@ -90,13 +90,11 @@ int main(int argc, char** argv)
int count = options.getCount();
Message message;
int i = 0;
-
+
while (receiver.fetch(message, timeout)) {
std::cout << "Message(properties=" << message.getProperties() << ", content='" ;
if (message.getContentType() == "amqp/map") {
- Variant::Map map;
- decode(message, map);
- std::cout << map;
+ std::cout << message.getContentObject().asMap();
} else {
std::cout << message.getContent();
}
@@ -106,7 +104,7 @@ int main(int argc, char** argv)
break;
}
receiver.close();
- session.close();
+ session.close();
connection.close();
return 0;
} catch(const std::exception& error) {
@@ -114,5 +112,5 @@ int main(int argc, char** argv)
connection.close();
}
}
- return 1;
+ return 1;
}
diff --git a/qpid/cpp/examples/messaging/map_receiver.cpp b/qpid/cpp/examples/messaging/map_receiver.cpp
index 081f7394a8..96bc76b821 100644
--- a/qpid/cpp/examples/messaging/map_receiver.cpp
+++ b/qpid/cpp/examples/messaging/map_receiver.cpp
@@ -39,15 +39,13 @@ int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
const char* address = argc>2 ? argv[2] : "message_queue; {create: always}";
std::string connectionOptions = argc > 3 ? argv[3] : "";
-
+
Connection connection(url, connectionOptions);
try {
connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver(address);
- Variant::Map content;
- decode(receiver.fetch(), content);
- std::cout << content << std::endl;
+ std::cout << receiver.fetch().getContentObject() << std::endl;
session.acknowledge();
receiver.close();
connection.close();
@@ -56,5 +54,5 @@ int main(int argc, char** argv) {
std::cout << error.what() << std::endl;
connection.close();
}
- return 1;
+ return 1;
}
diff --git a/qpid/cpp/examples/messaging/map_sender.cpp b/qpid/cpp/examples/messaging/map_sender.cpp
index 8ce3e1d8ec..81ac7320d8 100644
--- a/qpid/cpp/examples/messaging/map_sender.cpp
+++ b/qpid/cpp/examples/messaging/map_sender.cpp
@@ -39,7 +39,7 @@ int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
const char* address = argc>2 ? argv[2] : "message_queue; {create: always}";
std::string connectionOptions = argc > 3 ? argv[3] : "";
-
+
Connection connection(url, connectionOptions);
try {
connection.open();
@@ -57,8 +57,8 @@ int main(int argc, char** argv) {
colours.push_back(Variant("white"));
content["colours"] = colours;
content["uuid"] = Uuid(true);
- encode(content, message);
-
+ message.setContentObject(content);
+
sender.send(message, true);
connection.close();
diff --git a/qpid/cpp/examples/messaging/spout.cpp b/qpid/cpp/examples/messaging/spout.cpp
index 72fcdc7c65..e80b434b3e 100644
--- a/qpid/cpp/examples/messaging/spout.cpp
+++ b/qpid/cpp/examples/messaging/spout.cpp
@@ -145,7 +145,7 @@ int main(int argc, char** argv)
if (options.entries.size()) {
Variant::Map content;
options.setEntries(content);
- encode(content, message);
+ message.getContentObject() = content;
} else if (options.content.size()) {
message.setContent(options.content);
message.setContentType("text/plain");
diff --git a/qpid/cpp/include/qpid/messaging/Message.h b/qpid/cpp/include/qpid/messaging/Message.h
index 5b14c7cf27..10569eb006 100644
--- a/qpid/cpp/include/qpid/messaging/Message.h
+++ b/qpid/cpp/include/qpid/messaging/Message.h
@@ -42,6 +42,7 @@ class MessageImpl;
class QPID_MESSAGING_CLASS_EXTERN Message
{
public:
+ QPID_MESSAGING_EXTERN Message(qpid::types::Variant&);
QPID_MESSAGING_EXTERN Message(const std::string& bytes = std::string());
QPID_MESSAGING_EXTERN Message(const char*, size_t);
QPID_MESSAGING_EXTERN Message(const Message&);
@@ -164,6 +165,27 @@ class QPID_MESSAGING_CLASS_EXTERN Message
/** Get the content as a std::string */
QPID_MESSAGING_EXTERN std::string getContent() const;
+ /** Get the content as raw bytes (an alias for getContent() */
+ QPID_MESSAGING_EXTERN std::string getContentBytes() const;
+ /** Set the content as raw bytes (an alias for setContent() */
+ QPID_MESSAGING_EXTERN void setContentBytes(const std::string&);
+ /**
+ * Get the content as a Variant, which can represent an object of
+ * different types. This can be used for content representing a
+ * map or a list for example.
+ */
+ QPID_MESSAGING_EXTERN qpid::types::Variant& getContentObject();
+ /**
+ * Get the content as a Variant, which can represent an object of
+ * different types. This can be used for content representing a
+ * map or a list for example.
+ */
+ QPID_MESSAGING_EXTERN const qpid::types::Variant& getContentObject() const;
+ /**
+ * Set the content using a Variant, which can represent an object
+ * of different types.
+ */
+ QPID_MESSAGING_EXTERN void setContentObject(const qpid::types::Variant&);
/**
* Get a const pointer to the start of the content data. The
* memory pointed to is owned by the message. The getContentSize()
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 081fce5220..3c77c4e33f 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1026,12 +1026,16 @@ set (qpidcommon_SOURCES
qpid/amqp_0_10/Codecs.cpp
qpid/amqp/CharSequence.h
qpid/amqp/CharSequence.cpp
+ qpid/amqp/DataBuilder.h
+ qpid/amqp/DataBuilder.cpp
qpid/amqp/Decoder.h
qpid/amqp/Decoder.cpp
qpid/amqp/Descriptor.h
qpid/amqp/Descriptor.cpp
qpid/amqp/Encoder.h
qpid/amqp/Encoder.cpp
+ qpid/amqp/ListBuilder.h
+ qpid/amqp/ListBuilder.cpp
qpid/amqp/MapHandler.h
qpid/amqp/MapEncoder.h
qpid/amqp/MapEncoder.cpp
diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.cpp b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp
new file mode 100644
index 0000000000..805125eb7f
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DataBuilder.h"
+#include "CharSequence.h"
+#include "qpid/log/Statement.h"
+#include "qpid/types/encodings.h"
+
+namespace qpid {
+namespace amqp {
+
+void DataBuilder::onNull(const Descriptor*)
+{
+ handle(qpid::types::Variant());
+}
+void DataBuilder::onBoolean(bool v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onUByte(uint8_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onUShort(uint16_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onUInt(uint32_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onULong(uint64_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onByte(int8_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onShort(int16_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onInt(int32_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onLong(int64_t v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onFloat(float v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onDouble(double v, const Descriptor*)
+{
+ handle(v);
+}
+void DataBuilder::onUuid(const CharSequence& v, const Descriptor*)
+{
+ if (v.size == qpid::types::Uuid::SIZE) {
+ handle(qpid::types::Uuid(v.data));
+ }
+}
+void DataBuilder::onTimestamp(int64_t v, const Descriptor*)
+{
+ handle(v);
+}
+
+void DataBuilder::handle(const qpid::types::Variant& v)
+{
+ switch (nested.top()->getType()) {
+ case qpid::types::VAR_MAP:
+ nested.push(&nested.top()->asMap()[v.asString()]);
+ break;
+ case qpid::types::VAR_LIST:
+ nested.top()->asList().push_back(v);
+ break;
+ default:
+ *(nested.top()) = v;
+ nested.pop();
+ break;
+ }
+}
+
+void DataBuilder::onBinary(const CharSequence& v, const Descriptor*)
+{
+ onString(std::string(v.data, v.size), qpid::types::encodings::BINARY);
+}
+void DataBuilder::onString(const CharSequence& v, const Descriptor*)
+{
+ onString(std::string(v.data, v.size), qpid::types::encodings::UTF8);
+}
+void DataBuilder::onSymbol(const CharSequence& v, const Descriptor*)
+{
+ onString(std::string(v.data, v.size), qpid::types::encodings::ASCII);
+}
+
+void DataBuilder::onString(const std::string& value, const std::string& encoding)
+{
+ switch (nested.top()->getType()) {
+ case qpid::types::VAR_MAP:
+ nested.push(&nested.top()->asMap()[value]);
+ break;
+ case qpid::types::VAR_LIST:
+ nested.top()->asList().push_back(qpid::types::Variant(value));
+ nested.top()->asList().back().setEncoding(encoding);
+ break;
+ default:
+ qpid::types::Variant& v = *(nested.top());
+ v = value;
+ v.setEncoding(encoding);
+ nested.pop();
+ break;
+ }
+}
+
+bool DataBuilder::proceed()
+{
+ return !nested.empty();
+}
+
+bool DataBuilder::nest(const qpid::types::Variant& n)
+{
+ switch (nested.top()->getType()) {
+ case qpid::types::VAR_MAP:
+ QPID_LOG(error, QPID_MSG("Expecting map key; got " << n));
+ break;
+ case qpid::types::VAR_LIST:
+ nested.top()->asList().push_back(n);
+ nested.push(&nested.top()->asList().back());
+ break;
+ default:
+ qpid::types::Variant& value = *(nested.top());
+ value = n;
+ nested.pop();
+ nested.push(&value);
+ break;
+ }
+ return true;
+}
+
+bool DataBuilder::onStartList(uint32_t, const CharSequence&, const Descriptor*)
+{
+ return nest(qpid::types::Variant::List());
+}
+void DataBuilder::onEndList(uint32_t /*count*/, const Descriptor*)
+{
+ nested.pop();
+}
+bool DataBuilder::onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*)
+{
+ return nest(qpid::types::Variant::Map());
+}
+void DataBuilder::onEndMap(uint32_t /*count*/, const Descriptor*)
+{
+ nested.pop();
+}
+bool DataBuilder::onStartArray(uint32_t count, const CharSequence&, const Constructor&, const Descriptor*)
+{
+ return onStartList(count, CharSequence::create(), 0);
+}
+void DataBuilder::onEndArray(uint32_t count, const Descriptor*)
+{
+ onEndList(count, 0);
+}
+qpid::types::Variant& DataBuilder::getValue()
+{
+ return base;
+}
+DataBuilder::DataBuilder(qpid::types::Variant v) : base(v)
+{
+ nested.push(&base);
+}
+DataBuilder::~DataBuilder() {}
+}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.h b/qpid/cpp/src/qpid/amqp/DataBuilder.h
new file mode 100644
index 0000000000..672584d73d
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp/DataBuilder.h
@@ -0,0 +1,78 @@
+#ifndef QPID_AMQP_DATABUILDER_H
+#define QPID_AMQP_DATABUILDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Reader.h"
+#include "qpid/types/Variant.h"
+#include <stack>
+
+namespace qpid {
+namespace amqp {
+
+/**
+ * Utility to build a Variant based structure (or value) from a data stream
+ */
+class DataBuilder : public Reader
+{
+ public:
+ DataBuilder(qpid::types::Variant);
+ virtual ~DataBuilder();
+ void onNull(const Descriptor*);
+ void onBoolean(bool, const Descriptor*);
+ void onUByte(uint8_t, const Descriptor*);
+ void onUShort(uint16_t, const Descriptor*);
+ void onUInt(uint32_t, const Descriptor*);
+ void onULong(uint64_t, const Descriptor*);
+ void onByte(int8_t, const Descriptor*);
+ void onShort(int16_t, const Descriptor*);
+ void onInt(int32_t, const Descriptor*);
+ void onLong(int64_t, const Descriptor*);
+ void onFloat(float, const Descriptor*);
+ void onDouble(double, const Descriptor*);
+ void onUuid(const CharSequence&, const Descriptor*);
+ void onTimestamp(int64_t, const Descriptor*);
+
+ void onBinary(const CharSequence&, const Descriptor*);
+ void onString(const CharSequence&, const Descriptor*);
+ void onSymbol(const CharSequence&, const Descriptor*);
+
+ bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*);
+ bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*);
+ bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*);
+ void onEndList(uint32_t /*count*/, const Descriptor*);
+ void onEndMap(uint32_t /*count*/, const Descriptor*);
+ void onEndArray(uint32_t /*count*/, const Descriptor*);
+
+ bool proceed();
+ qpid::types::Variant& getValue();
+ private:
+ qpid::types::Variant base;
+ std::stack<qpid::types::Variant*> nested;
+ std::string key;
+
+ void handle(const qpid::types::Variant& v);
+ bool nest(const qpid::types::Variant& v);
+ void onString(const std::string&, const std::string&);
+};
+}} // namespace qpid::amqp
+
+#endif /*!QPID_AMQP_DATABUILDER_H*/
diff --git a/qpid/cpp/src/qpid/amqp/Encoder.cpp b/qpid/cpp/src/qpid/amqp/Encoder.cpp
index 549b6d1e4e..627cc4aed6 100644
--- a/qpid/cpp/src/qpid/amqp/Encoder.cpp
+++ b/qpid/cpp/src/qpid/amqp/Encoder.cpp
@@ -23,11 +23,15 @@
#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/typecodes.h"
#include "qpid/types/Uuid.h"
+#include "qpid/types/Variant.h"
+#include "qpid/types/encodings.h"
#include "qpid/log/Statement.h"
#include "qpid/Exception.h"
#include <assert.h>
#include <string.h>
+using namespace qpid::types::encodings;
+
namespace qpid {
namespace amqp {
@@ -373,6 +377,87 @@ void Encoder::endArray32(size_t count, void* token)
end<uint32_t>(count, token, data+position);
}
+void Encoder::writeMap(const std::map<std::string, qpid::types::Variant>& value, const Descriptor* d, bool large)
+{
+ void* token = large ? startMap32(d) : startMap8(d);
+ for (qpid::types::Variant::Map::const_iterator i = value.begin(); i != value.end(); ++i) {
+ writeString(i->first);
+ writeValue(i->second);
+ }
+ if (large) endMap32(value.size()*2, token);
+ else endMap8(value.size()*2, token);
+}
+
+void Encoder::writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d, bool large)
+{
+ void* token = large ? startList32(d) : startList8(d);
+ for (qpid::types::Variant::List::const_iterator i = value.begin(); i != value.end(); ++i) {
+ writeValue(*i);
+ }
+ if (large) endList32(value.size(), token);
+ else endList8(value.size(), token);
+}
+
+void Encoder::writeValue(const qpid::types::Variant& value, const Descriptor* d)
+{
+ switch (value.getType()) {
+ case qpid::types::VAR_VOID:
+ writeNull(d);
+ break;
+ case qpid::types::VAR_BOOL:
+ writeBoolean(value.asBool(), d);
+ break;
+ case qpid::types::VAR_UINT8:
+ writeUByte(value.asUint8(), d);
+ break;
+ case qpid::types::VAR_UINT16:
+ writeUShort(value.asUint16(), d);
+ break;
+ case qpid::types::VAR_UINT32:
+ writeUInt(value.asUint32(), d);
+ break;
+ case qpid::types::VAR_UINT64:
+ writeULong(value.asUint64(), d);
+ break;
+ case qpid::types::VAR_INT8:
+ writeByte(value.asInt8(), d);
+ break;
+ case qpid::types::VAR_INT16:
+ writeShort(value.asInt16(), d);
+ break;
+ case qpid::types::VAR_INT32:
+ writeInt(value.asInt32(), d);
+ break;
+ case qpid::types::VAR_INT64:
+ writeLong(value.asInt64(), d);
+ break;
+ case qpid::types::VAR_FLOAT:
+ writeFloat(value.asFloat(), d);
+ break;
+ case qpid::types::VAR_DOUBLE:
+ writeDouble(value.asDouble(), d);
+ break;
+ case qpid::types::VAR_STRING:
+ if (value.getEncoding() == UTF8) {
+ writeString(value.getString(), d);
+ } else if (value.getEncoding() == ASCII) {
+ writeSymbol(value.getString(), d);
+ } else {
+ writeBinary(value.getString(), d);
+ }
+ break;
+ case qpid::types::VAR_MAP:
+ writeMap(value.asMap(), d);
+ break;
+ case qpid::types::VAR_LIST:
+ writeList(value.asList(), d);
+ break;
+ case qpid::types::VAR_UUID:
+ writeUuid(value.asUuid(), d);
+ break;
+ }
+
+}
void Encoder::writeDescriptor(const Descriptor& d)
{
diff --git a/qpid/cpp/src/qpid/amqp/Encoder.h b/qpid/cpp/src/qpid/amqp/Encoder.h
index c661e4ac5d..d6c0054a14 100644
--- a/qpid/cpp/src/qpid/amqp/Encoder.h
+++ b/qpid/cpp/src/qpid/amqp/Encoder.h
@@ -23,12 +23,15 @@
*/
#include "qpid/sys/IntegerTypes.h"
#include "qpid/amqp/Constructor.h"
+#include <list>
+#include <map>
#include <stddef.h>
#include <string>
namespace qpid {
namespace types {
class Uuid;
+class Variant;
}
namespace amqp {
struct CharSequence;
@@ -91,6 +94,10 @@ class Encoder
void endArray8(size_t count, void*);
void endArray32(size_t count, void*);
+ void writeValue(const qpid::types::Variant&, const Descriptor* d=0);
+ void writeMap(const std::map<std::string, qpid::types::Variant>& value, const Descriptor* d=0, bool large=true);
+ void writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d=0, bool large=true);
+
void writeDescriptor(const Descriptor&);
Encoder(char* data, size_t size);
size_t getPosition();
diff --git a/qpid/cpp/src/qpid/amqp/ListBuilder.cpp b/qpid/cpp/src/qpid/amqp/ListBuilder.cpp
new file mode 100644
index 0000000000..f2ca8e8805
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp/ListBuilder.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ListBuilder.h"
+
+namespace qpid {
+namespace amqp {
+
+ListBuilder::ListBuilder() : DataBuilder(qpid::types::Variant::List()) {}
+
+qpid::types::Variant::List& ListBuilder::getList()
+{
+ return getValue().asList();
+}
+
+}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/ListBuilder.h b/qpid/cpp/src/qpid/amqp/ListBuilder.h
new file mode 100644
index 0000000000..ee6af62539
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp/ListBuilder.h
@@ -0,0 +1,40 @@
+#ifndef QPID_AMQP_LISTBUILDER_H
+#define QPID_AMQP_LISTBUILDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DataBuilder.h"
+
+namespace qpid {
+namespace amqp {
+
+/**
+ * Utility to build a Variant::List from a data stream
+ */
+class ListBuilder : public DataBuilder
+{
+ public:
+ ListBuilder();
+ qpid::types::Variant::List& getList();
+};
+}} // namespace qpid::amqp
+
+#endif /*!QPID_AMQP_LISTBUILDER_H*/
diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.cpp b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
index a554497791..ce8eea038e 100644
--- a/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
+++ b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
@@ -19,112 +19,12 @@
*
*/
#include "MapBuilder.h"
-#include <assert.h>
namespace qpid {
namespace amqp {
-namespace {
-const std::string BINARY("binary");
-const std::string UTF8("utf8");
-const std::string ASCII("ascii");
-}
-
+MapBuilder::MapBuilder() : DataBuilder(qpid::types::Variant::Map()) {}
qpid::types::Variant::Map MapBuilder::getMap()
{
- return map;
-}
-const qpid::types::Variant::Map MapBuilder::getMap() const
-{
- return map;
-}
-
-void MapBuilder::onNullValue(const CharSequence& key, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = qpid::types::Variant();
-}
-void MapBuilder::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-void MapBuilder::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onFloatValue(const CharSequence& key, float value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
-{
- assert(value.size == 16);
- map[std::string(key.data, key.size)] = qpid::types::Uuid(value.data);
-}
-
-void MapBuilder::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
-{
- map[std::string(key.data, key.size)] = value;
-}
-
-void MapBuilder::onBinaryValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
-{
- qpid::types::Variant& v = map[std::string(key.data, key.size)];
- v = std::string(value.data, value.size);
- v.setEncoding(BINARY);
-}
-
-void MapBuilder::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
-{
- qpid::types::Variant& v = map[std::string(key.data, key.size)];
- v = std::string(value.data, value.size);
- v.setEncoding(UTF8);
-}
-
-void MapBuilder::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
-{
- qpid::types::Variant& v = map[std::string(key.data, key.size)];
- v = std::string(value.data, value.size);
- v.setEncoding(ASCII);
+ return getValue().asMap();
}
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.h b/qpid/cpp/src/qpid/amqp/MapBuilder.h
index 0e3b95f633..500d2e6db3 100644
--- a/qpid/cpp/src/qpid/amqp/MapBuilder.h
+++ b/qpid/cpp/src/qpid/amqp/MapBuilder.h
@@ -21,42 +21,19 @@
* under the License.
*
*/
-#include "MapReader.h"
-#include "qpid/types/Variant.h"
+#include "DataBuilder.h"
namespace qpid {
namespace amqp {
/**
- * Utility to build a Variant::Map from a data stream (doesn't handle
- * nested maps or lists yet)
+ * Utility to build a Variant::Map from a data stream
*/
-class MapBuilder : public MapReader
+class MapBuilder : public DataBuilder
{
public:
- void onNullValue(const CharSequence& /*key*/, const Descriptor*);
- void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*);
- void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*);
- void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*);
- void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*);
- void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*);
- void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*);
- void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*);
- void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*);
- void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
- void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*);
- void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*);
- void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
- void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
-
- void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
- void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
- void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
-
+ MapBuilder();
qpid::types::Variant::Map getMap();
- const qpid::types::Variant::Map getMap() const;
- private:
- qpid::types::Variant::Map map;
};
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp
index 3b493d1de7..beaea2befd 100644
--- a/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp
+++ b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp
@@ -156,68 +156,6 @@ void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map&
writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large);
}
-void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const Descriptor* d, bool large)
-{
- void* token = large ? startMap32(d) : startMap8(d);
- for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
- writeString(i->first);
- switch (i->second.getType()) {
- case qpid::types::VAR_MAP:
- case qpid::types::VAR_LIST:
- //not allowed (TODO: revise, only strictly true for application-properties) whereas this is now a more general method)
- QPID_LOG(warning, "Ignoring nested map/list; not allowed in application-properties for AMQP 1.0");
- case qpid::types::VAR_VOID:
- writeNull();
- break;
- case qpid::types::VAR_BOOL:
- writeBoolean(i->second);
- break;
- case qpid::types::VAR_UINT8:
- writeUByte(i->second);
- break;
- case qpid::types::VAR_UINT16:
- writeUShort(i->second);
- break;
- case qpid::types::VAR_UINT32:
- writeUInt(i->second);
- break;
- case qpid::types::VAR_UINT64:
- writeULong(i->second);
- break;
- case qpid::types::VAR_INT8:
- writeByte(i->second);
- break;
- case qpid::types::VAR_INT16:
- writeShort(i->second);
- break;
- case qpid::types::VAR_INT32:
- writeInt(i->second);
- break;
- case qpid::types::VAR_INT64:
- writeULong(i->second);
- break;
- case qpid::types::VAR_FLOAT:
- writeFloat(i->second);
- break;
- case qpid::types::VAR_DOUBLE:
- writeDouble(i->second);
- break;
- case qpid::types::VAR_STRING:
- if (i->second.getEncoding() == BINARY) {
- writeBinary(i->second);
- } else {
- writeString(i->second);
- }
- break;
- case qpid::types::VAR_UUID:
- writeUuid(i->second);
- break;
- }
- }
- if (large) endMap32(properties.size()*2, token);
- else endMap8(properties.size()*2, token);
-}
-
size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
{
return getEncodedSize(h) + getEncodedSize(p, ap, d);
@@ -288,46 +226,56 @@ size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map
{
size_t total = 0;
for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
- total += 1/*code*/ + encodedSize(i->first);
-
- switch (i->second.getType()) {
- case qpid::types::VAR_MAP:
- case qpid::types::VAR_LIST:
- case qpid::types::VAR_VOID:
- case qpid::types::VAR_BOOL:
- total += 1;
- break;
-
- case qpid::types::VAR_UINT8:
- case qpid::types::VAR_INT8:
- total += 2;
- break;
-
- case qpid::types::VAR_UINT16:
- case qpid::types::VAR_INT16:
- total += 3;
- break;
-
- case qpid::types::VAR_UINT32:
- case qpid::types::VAR_INT32:
- case qpid::types::VAR_FLOAT:
- total += 5;
- break;
-
- case qpid::types::VAR_UINT64:
- case qpid::types::VAR_INT64:
- case qpid::types::VAR_DOUBLE:
- total += 9;
- break;
-
- case qpid::types::VAR_UUID:
- total += 17;
- break;
-
- case qpid::types::VAR_STRING:
- total += 1/*code*/ + encodedSize(i->second);
- break;
- }
+ total += 1/*code*/ + encodedSize(i->first) + getEncodedSizeForValue(i->second);
+ }
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSizeForValue(const qpid::types::Variant& value)
+{
+ size_t total = 0;
+ switch (value.getType()) {
+ case qpid::types::VAR_MAP:
+ total += getEncodedSize(value.asMap(), true);
+ break;
+ case qpid::types::VAR_LIST:
+ total += getEncodedSize(value.asList(), true);
+ break;
+
+ case qpid::types::VAR_VOID:
+ case qpid::types::VAR_BOOL:
+ total += 1;
+ break;
+
+ case qpid::types::VAR_UINT8:
+ case qpid::types::VAR_INT8:
+ total += 2;
+ break;
+
+ case qpid::types::VAR_UINT16:
+ case qpid::types::VAR_INT16:
+ total += 3;
+ break;
+
+ case qpid::types::VAR_UINT32:
+ case qpid::types::VAR_INT32:
+ case qpid::types::VAR_FLOAT:
+ total += 5;
+ break;
+
+ case qpid::types::VAR_UINT64:
+ case qpid::types::VAR_INT64:
+ case qpid::types::VAR_DOUBLE:
+ total += 9;
+ break;
+
+ case qpid::types::VAR_UUID:
+ total += 17;
+ break;
+
+ case qpid::types::VAR_STRING:
+ total += 1/*code*/ + encodedSize(value.getString());
+ break;
}
return total;
}
@@ -345,4 +293,20 @@ size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool
return total;
}
+
+size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::List& list, bool alwaysUseLargeList)
+{
+ size_t total(0);
+ for (qpid::types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ total += getEncodedSizeForValue(*i);
+ }
+
+ //its not just the count that determines whether we can use a small list, but the aggregate size:
+ if (alwaysUseLargeList || list.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/;
+ else total += 1/*size*/ + 1/*count*/;
+
+ total += 1 /*code for list itself*/;
+
+ return total;
+}
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/MessageEncoder.h b/qpid/cpp/src/qpid/amqp/MessageEncoder.h
index e9ab2c5f89..a9fb7366af 100644
--- a/qpid/cpp/src/qpid/amqp/MessageEncoder.h
+++ b/qpid/cpp/src/qpid/amqp/MessageEncoder.h
@@ -91,22 +91,26 @@ class MessageEncoder : public Encoder
void writeApplicationProperties(const qpid::types::Variant::Map& properties);
void writeApplicationProperties(const qpid::types::Variant::Map& properties, bool useLargeMap);
- void writeMap(const qpid::types::Variant::Map& map, const Descriptor*, bool useLargeMap);
-
static size_t getEncodedSize(const Header&);
static size_t getEncodedSize(const Properties&);
static size_t getEncodedSize(const ApplicationProperties&);
- static size_t getEncodedSize(const Header&, const Properties&, const ApplicationProperties&, const std::string&);
+ static size_t getEncodedSize(const qpid::types::Variant::List&, bool useLargeList);
static size_t getEncodedSize(const qpid::types::Variant::Map&, bool useLargeMap);
- static size_t getEncodedSize(const qpid::types::Variant::Map&);
- static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&);
+
+ static size_t getEncodedSizeForValue(const qpid::types::Variant& value);
+ static size_t getEncodedSizeForContent(const std::string&);
+
+ //used in translating 0-10 content to 1.0, to determine buffer space needed
static size_t getEncodedSize(const Properties&, const qpid::types::Variant::Map&, const std::string&);
+
private:
bool optimise;
+ static size_t getEncodedSize(const Header&, const Properties&, const ApplicationProperties&, const std::string&);
+ static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&);
+
static size_t getEncodedSizeForElements(const qpid::types::Variant::Map&);
- static size_t getEncodedSizeForContent(const std::string&);
};
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.cpp b/qpid/cpp/src/qpid/amqp/MessageReader.cpp
index 7a428bf4a9..726ee087d1 100644
--- a/qpid/cpp/src/qpid/amqp/MessageReader.cpp
+++ b/qpid/cpp/src/qpid/amqp/MessageReader.cpp
@@ -21,6 +21,7 @@
#include "qpid/amqp/MessageReader.h"
#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/typecodes.h"
#include "qpid/types/Uuid.h"
#include "qpid/types/Variant.h"
#include "qpid/log/Statement.h"
@@ -55,40 +56,6 @@ const size_t REPLY_TO_GROUP_ID(12);
}
-/*
-Reader& MessageReader::HeaderReader::getReader(size_t index)
-{
- switch (index) {
- case DURABLE: return durableReader;
- case PRIORITY: return priorityReader;
- case TTL: return ttlReader;
- case FIRST_ACQUIRER: return firstAcquirerReader;
- case DELIVERY_COUNT: return deliveryCountReader;
- default: return noSuchFieldReader;
- }
-}
-
-Reader& MessageReader::PropertiesReader::getReader(size_t index)
-{
- switch (index) {
- case MESSAGE_ID: return messageIdReader;
- case USER_ID: return userIdReader;
- case TO: return toReader;
- case SUBJECT: return subjectReader;
- case REPLY_TO: return replyToReader;
- case CORRELATION_ID: return correlationIdReader;
- case CONTENT_TYPE: return contentTypeReader;
- case CONTENT_ENCODING: return contentEncodingReader;
- case ABSOLUTE_EXPIRY_TIME: return absoluteExpiryTimeReader;
- case CREATION_TIME: return creationTimeReader;
- case GROUP_ID: return groupIdReader;
- case GROUP_SEQUENCE: return groupSequenceReader;
- case REPLY_TO_GROUP_ID: return replyToGroupIdReader;
- default: return noSuchFieldReader;
- }
-}
-*/
-
MessageReader::HeaderReader::HeaderReader(MessageReader& p) : parent(p), index(0) {}
void MessageReader::HeaderReader::onBoolean(bool v, const Descriptor*) // durable, first-acquirer
{
@@ -234,8 +201,11 @@ bool MessageReader::onStartList(uint32_t count, const CharSequence& raw, const D
} else if (descriptor->match(PROPERTIES_SYMBOL, PROPERTIES_CODE)) {
delegate = &propertiesReader;
return true;
- } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(raw, *descriptor);
+ } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE)) {
+ onAmqpSequence(raw);
+ return false;
+ } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
+ onAmqpValue(raw, qpid::amqp::typecodes::LIST_NAME);
return false;
} else {
QPID_LOG(warning, "Unexpected described list: " << *descriptor);
@@ -276,7 +246,7 @@ bool MessageReader::onStartMap(uint32_t count, const CharSequence& raw, const De
onApplicationProperties(raw);
return false;
} else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(raw, *descriptor);
+ onAmqpValue(raw, qpid::amqp::typecodes::MAP_NAME);
return false;
} else {
QPID_LOG(warning, "Unexpected described map: " << *descriptor);
@@ -300,8 +270,10 @@ void MessageReader::onBinary(const CharSequence& bytes, const Descriptor* descri
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got binary value with no descriptor.");
- } else if (descriptor->match(DATA_SYMBOL, DATA_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(bytes, *descriptor);
+ } else if (descriptor->match(DATA_SYMBOL, DATA_CODE)) {
+ onData(bytes);
+ } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
+ onAmqpValue(bytes, qpid::amqp::typecodes::BINARY_NAME);
} else {
QPID_LOG(warning, "Unexpected binary value with descriptor: " << *descriptor);
}
@@ -317,7 +289,7 @@ void MessageReader::onNull(const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant v;
- onBody(v, *descriptor);
+ onAmqpValue(v);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got null value with no descriptor.");
@@ -333,7 +305,7 @@ void MessageReader::onString(const CharSequence& v, const Descriptor* descriptor
delegate->onString(v, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(v, *descriptor);
+ onAmqpValue(v, qpid::amqp::typecodes::STRING_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got string value with no descriptor.");
@@ -349,7 +321,7 @@ void MessageReader::onSymbol(const CharSequence& v, const Descriptor* descriptor
delegate->onSymbol(v, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(v, *descriptor);
+ onAmqpValue(v, qpid::amqp::typecodes::SYMBOL_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got symbol value with no descriptor.");
@@ -367,7 +339,7 @@ void MessageReader::onBoolean(bool v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got boolean value with no descriptor.");
@@ -385,7 +357,7 @@ void MessageReader::onUByte(uint8_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got ubyte value with no descriptor.");
@@ -403,7 +375,7 @@ void MessageReader::onUShort(uint16_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got ushort value with no descriptor.");
@@ -421,7 +393,7 @@ void MessageReader::onUInt(uint32_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got uint value with no descriptor.");
@@ -439,7 +411,7 @@ void MessageReader::onULong(uint64_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got ulong value with no descriptor.");
@@ -457,7 +429,7 @@ void MessageReader::onByte(int8_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got byte value with no descriptor.");
@@ -475,7 +447,7 @@ void MessageReader::onShort(int16_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got short value with no descriptor.");
@@ -493,7 +465,7 @@ void MessageReader::onInt(int32_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got int value with no descriptor.");
@@ -511,7 +483,7 @@ void MessageReader::onLong(int64_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got long value with no descriptor.");
@@ -529,7 +501,7 @@ void MessageReader::onFloat(float v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got float value with no descriptor.");
@@ -547,7 +519,7 @@ void MessageReader::onDouble(double v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got double value with no descriptor.");
@@ -564,7 +536,7 @@ void MessageReader::onUuid(const CharSequence& v, const Descriptor* descriptor)
delegate->onUuid(v, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(v, *descriptor);
+ onAmqpValue(v, qpid::amqp::typecodes::UUID_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got uuid value with no descriptor.");
@@ -582,7 +554,7 @@ void MessageReader::onTimestamp(int64_t v, const Descriptor* descriptor)
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
qpid::types::Variant body = v;
- onBody(body, *descriptor);
+ onAmqpValue(body);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got timestamp value with no descriptor.");
@@ -599,7 +571,8 @@ bool MessageReader::onStartArray(uint32_t count, const CharSequence& raw, const
return delegate->onStartArray(count, raw, constructor, descriptor);
} else {
if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) {
- onBody(raw, *descriptor);
+ //TODO: might be better to decode this here
+ onAmqpValue(raw, qpid::amqp::typecodes::ARRAY_NAME);
} else {
if (!descriptor) {
QPID_LOG(warning, "Expected described type but got array with no descriptor.");
diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.h b/qpid/cpp/src/qpid/amqp/MessageReader.h
index 5d26b288f5..2dc112a28a 100644
--- a/qpid/cpp/src/qpid/amqp/MessageReader.h
+++ b/qpid/cpp/src/qpid/amqp/MessageReader.h
@@ -98,172 +98,21 @@ class MessageReader : public Reader
virtual void onApplicationProperties(const CharSequence&) = 0;
virtual void onDeliveryAnnotations(const CharSequence&) = 0;
virtual void onMessageAnnotations(const CharSequence&) = 0;
- virtual void onBody(const CharSequence&, const Descriptor&) = 0;
- virtual void onBody(const qpid::types::Variant&, const Descriptor&) = 0;
+
+ virtual void onData(const CharSequence&) = 0;
+ virtual void onAmqpSequence(const CharSequence&) = 0;
+ virtual void onAmqpValue(const CharSequence&, const std::string& type) = 0;
+ virtual void onAmqpValue(const qpid::types::Variant&) = 0;
+
virtual void onFooter(const CharSequence&) = 0;
QPID_COMMON_EXTERN CharSequence getBareMessage() const;
private:
- /*
- class DurableReader : public Reader
- {
- public:
- DurableReader(MessageReader&);
- void onBoolean(bool v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class PriorityReader : public Reader
- {
- public:
- PriorityReader(MessageReader&);
- void onUByte(uint8_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class TtlReader : public Reader
- {
- public:
- TtlReader(MessageReader&);
- void onUInt(uint32_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class FirstAcquirerReader : public Reader
- {
- public:
- FirstAcquirerReader(MessageReader&);
- void onBoolean(bool v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class DeliveryCountReader : public Reader
- {
- public:
- DeliveryCountReader(MessageReader&);
- void onUInt(uint32_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class MessageIdReader : public Reader
- {
- public:
- MessageIdReader(MessageReader&);
- void onUuid(const qpid::types::Uuid& v, const Descriptor*);
- void onULong(uint64_t v, const Descriptor*);
- void onString(const CharSequence& v, const Descriptor*);
- void onBinary(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class UserIdReader : public Reader
- {
- public:
- UserIdReader(MessageReader&);
- void onBinary(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ToReader : public Reader
- {
- public:
- ToReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class SubjectReader : public Reader
+ class HeaderReader : public Reader
{
public:
- SubjectReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ReplyToReader : public Reader
- {
- public:
- ReplyToReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class CorrelationIdReader : public Reader
- {
- public:
- CorrelationIdReader(MessageReader&);
- void onUuid(const qpid::types::Uuid& v, const Descriptor*);
- void onULong(uint64_t v, const Descriptor*);
- void onString(const CharSequence& v, const Descriptor*);
- void onBinary(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ContentTypeReader : public Reader
- {
- public:
- ContentTypeReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ContentEncodingReader : public Reader
- {
- public:
- ContentEncodingReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class AbsoluteExpiryTimeReader : public Reader
- {
- public:
- AbsoluteExpiryTimeReader(MessageReader&);
- void onTimestamp(int64_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class CreationTimeReader : public Reader
- {
- public:
- CreationTimeReader(MessageReader&);
- void onTimestamp(int64_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class GroupIdReader : public Reader
- {
- public:
- GroupIdReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class GroupSequenceReader : public Reader
- {
- public:
- GroupSequenceReader(MessageReader&);
- void onUInt(uint32_t v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- class ReplyToGroupIdReader : public Reader
- {
- public:
- ReplyToGroupIdReader(MessageReader&);
- void onString(const CharSequence& v, const Descriptor*);
- private:
- MessageReader& parent;
- };
- */
-
- class HeaderReader : public Reader //public ListReader
- {
- public:
- //Reader& getReader(size_t index);
-
HeaderReader(MessageReader&);
void onBoolean(bool v, const Descriptor*); // durable, first-acquirer
void onUByte(uint8_t v, const Descriptor*); // priority
@@ -273,11 +122,9 @@ class MessageReader : public Reader
MessageReader& parent;
size_t index;
};
- class PropertiesReader : public Reader //public ListReader
+ class PropertiesReader : public Reader
{
public:
- //Reader& getReader(size_t index);
-
PropertiesReader(MessageReader&);
void onUuid(const CharSequence& v, const Descriptor*); // message-id, correlation-id
void onULong(uint64_t v, const Descriptor*); // message-id, correlation-id
diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h
index 248d6df2df..e395fc25d7 100644
--- a/qpid/cpp/src/qpid/amqp/descriptors.h
+++ b/qpid/cpp/src/qpid/amqp/descriptors.h
@@ -52,6 +52,7 @@ const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE);
const Descriptor MESSAGE_ANNOTATIONS(MESSAGE_ANNOTATIONS_CODE);
const Descriptor PROPERTIES(PROPERTIES_CODE);
const Descriptor APPLICATION_PROPERTIES(APPLICATION_PROPERTIES_CODE);
+const Descriptor AMQP_VALUE(AMQP_VALUE_CODE);
const Descriptor DATA(DATA_CODE);
}
diff --git a/qpid/cpp/src/qpid/amqp/typecodes.h b/qpid/cpp/src/qpid/amqp/typecodes.h
index 3c6bd17b97..915b75ca3f 100644
--- a/qpid/cpp/src/qpid/amqp/typecodes.h
+++ b/qpid/cpp/src/qpid/amqp/typecodes.h
@@ -83,7 +83,7 @@ const uint8_t ARRAY32(0xf0);
const std::string NULL_NAME("null");
-const std::string BOOLEAN_NAME("name");
+const std::string BOOLEAN_NAME("bool");
const std::string UBYTE_NAME("ubyte");
const std::string USHORT_NAME("ushort");
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
index 3dcf1c14e6..5ea9664ea8 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
@@ -21,9 +21,13 @@
#include "Message.h"
#include "qpid/amqp/Decoder.h"
#include "qpid/amqp/descriptors.h"
-#include "qpid/amqp/Reader.h"
-#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/ListBuilder.h"
+#include "qpid/amqp/MapBuilder.h"
#include "qpid/amqp/MapHandler.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/Reader.h"
+#include "qpid/amqp/typecodes.h"
+#include "qpid/types/encodings.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/Buffer.h"
#include <string.h>
@@ -256,10 +260,52 @@ void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {}
void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; }
void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; }
void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; }
-void Message::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { body = v; }
-void Message::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
+
+void Message::onData(const qpid::amqp::CharSequence& v) { body = v; }
+void Message::onAmqpSequence(const qpid::amqp::CharSequence& v) { body = v; bodyType = qpid::amqp::typecodes::LIST_NAME; }
+void Message::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& t)
+{
+ body = v;
+ if (t == qpid::amqp::typecodes::STRING_NAME) {
+ bodyType = qpid::types::encodings::UTF8;
+ } else if (t == qpid::amqp::typecodes::SYMBOL_NAME) {
+ bodyType = qpid::types::encodings::ASCII;
+ } else if (t == qpid::amqp::typecodes::BINARY_NAME) {
+ bodyType = qpid::types::encodings::BINARY;
+ } else {
+ bodyType = t;
+ }
+}
+void Message::onAmqpValue(const qpid::types::Variant& v) { typedBody = v; }
+
void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; }
+bool Message::isTypedBody() const
+{
+ return !typedBody.isVoid() || !bodyType.empty();
+}
+
+qpid::types::Variant Message::getTypedBody() const
+{
+ if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+ qpid::amqp::ListBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ return builder.getList();
+ } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+ qpid::amqp::MapBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ return builder.getMap();
+ } else if (!bodyType.empty()) {
+ qpid::types::Variant value(std::string(body.data, body.size));
+ value.setEncoding(bodyType);
+ return value;
+ } else {
+ return typedBody;
+ }
+}
+
//PersistableMessage interface:
void Message::encode(framing::Buffer& buffer) const
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h
index cbf8669fc1..c7e2df84aa 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.h
@@ -64,6 +64,8 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
qpid::amqp::CharSequence getBareMessage() const;
qpid::amqp::CharSequence getBody() const;
qpid::amqp::CharSequence getFooter() const;
+ bool isTypedBody() const;
+ qpid::types::Variant getTypedBody() const;
Message(size_t size);
char* getData();
@@ -109,6 +111,8 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
//body:
qpid::amqp::CharSequence body;
+ qpid::types::Variant typedBody;
+ std::string bodyType;
//footer:
qpid::amqp::CharSequence footer;
@@ -139,8 +143,12 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
void onApplicationProperties(const qpid::amqp::CharSequence&);
void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
void onMessageAnnotations(const qpid::amqp::CharSequence&);
- void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
- void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
+
+ void onData(const qpid::amqp::CharSequence&);
+ void onAmqpSequence(const qpid::amqp::CharSequence&);
+ void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type);
+ void onAmqpValue(const qpid::types::Variant&);
+
void onFooter(const qpid::amqp::CharSequence&);
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 668b500570..366218c002 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -88,7 +88,7 @@ bool expired(const sys::AbsTime& start, double timeout)
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
minReconnectInterval(0.001), maxReconnectInterval(2),
- retries(0), reconnectOnLimitExceeded(true)
+ retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false)
{
setOptions(options);
urls.insert(urls.begin(), url);
@@ -157,8 +157,10 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
settings.sslCertName = value.asString();
} else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
reconnectOnLimitExceeded = value;
- } else if (name == "client-properties") {
+ } else if (name == "client-properties" || name == "client_properties") {
amqp_0_10::translate(value.asMap(), settings.clientProperties);
+ } else if (name == "disable-auto-decode" || name == "disable_auto_decode") {
+ disableAutoDecode = value;
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
@@ -346,4 +348,9 @@ std::string ConnectionImpl::getAuthenticatedUsername()
return connection.getNegotiatedSettings().username;
}
+bool ConnectionImpl::getAutoDecode() const
+{
+ return !disableAutoDecode;
+}
+
}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index d1ac4533d5..00ac30a6df 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -53,6 +53,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
void setOption(const std::string& name, const qpid::types::Variant& value);
bool backoff();
std::string getAuthenticatedUsername();
+ bool getAutoDecode() const;
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -70,6 +71,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
double maxReconnectInterval;
int32_t retries;
bool reconnectOnLimitExceeded;
+ bool disableAutoDecode;
void setOptions(const qpid::types::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index db6e843cf6..27a2107702 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -399,7 +399,7 @@ void populate(qpid::messaging::Message& message, FrameSet& command)
//need to be able to link the message back to the transfer it was delivered by
//e.g. for rejecting.
MessageImplAccess::get(message).setInternalId(command.getId());
-
+
message.setContent(command.getContent());
populateHeaders(message, command.getHeaders());
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index 348f9e160c..41b30be4fe 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -50,8 +50,20 @@ const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
void OutgoingMessage::convert(const qpid::messaging::Message& from)
{
//TODO: need to avoid copying as much as possible
- message.setData(from.getContent());
- message.getMessageProperties().setContentType(from.getContentType());
+ if (from.getContentObject().getType() == qpid::types::VAR_MAP) {
+ std::string content;
+ qpid::amqp_0_10::MapCodec::encode(from.getContentObject().asMap(), content);
+ message.getMessageProperties().setContentType(qpid::amqp_0_10::MapCodec::contentType);
+ message.setData(content);
+ } else if (from.getContentObject().getType() == qpid::types::VAR_LIST) {
+ std::string content;
+ qpid::amqp_0_10::ListCodec::encode(from.getContentObject().asList(), content);
+ message.getMessageProperties().setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ message.setData(content);
+ } else {
+ message.setData(from.getContent());
+ message.getMessageProperties().setContentType(from.getContentType());
+ }
if ( !from.getCorrelationId().empty() )
message.getMessageProperties().setCorrelationId(from.getCorrelationId());
message.getMessageProperties().setUserId(from.getUserId());
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 7e8de21247..c3c8b83dc5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -25,6 +25,7 @@
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Session.h"
+#include "qpid/amqp_0_10/Codecs.h"
namespace qpid {
namespace client {
@@ -148,9 +149,9 @@ qpid::messaging::Address ReceiverImpl::getAddress() const
}
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a) :
+ const qpid::messaging::Address& a, bool autoDecode_) :
- parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), autoDecode(autoDecode_),
state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -159,7 +160,20 @@ bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::D
sys::Mutex::ScopedLock l(lock);
if (state == CANCELLED) return false;
}
- return parent->get(*this, message, timeout);
+ if (parent->get(*this, message, timeout)) {
+ if (autoDecode) {
+ if (message.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+ message.getContentObject() = qpid::types::Variant::Map();
+ decode(message, message.getContentObject().asMap());
+ } else if (message.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+ message.getContentObject() = qpid::types::Variant::List();
+ decode(message, message.getContentObject().asList());
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
}
bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 4dba76c8d9..0d3366907b 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -48,7 +48,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
ReceiverImpl(SessionImpl& parent, const std::string& name,
- const qpid::messaging::Address& address);
+ const qpid::messaging::Address& address, bool autoDecode);
void init(qpid::client::AsyncSession session, AddressResolution& resolver);
bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -74,6 +74,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
const std::string destination;
const qpid::messaging::Address address;
const uint32_t byteCredit;
+ const bool autoDecode;
State state;
std::auto_ptr<MessageSource> source;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index e4c2c6afb8..982bfa3503 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -207,7 +207,7 @@ Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address
ScopedLock l(lock);
std::string name = address.getName();
getFreeKey(name, receivers);
- Receiver receiver(new ReceiverImpl(*this, name, address));
+ Receiver receiver(new ReceiverImpl(*this, name, address, connection->getAutoDecode()));
getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
receivers[name] = receiver;
return receiver;
diff --git a/qpid/cpp/src/qpid/messaging/Message.cpp b/qpid/cpp/src/qpid/messaging/Message.cpp
index 0f03bc8ca3..b40ae06cbc 100644
--- a/qpid/cpp/src/qpid/messaging/Message.cpp
+++ b/qpid/cpp/src/qpid/messaging/Message.cpp
@@ -31,6 +31,10 @@ using namespace qpid::types;
Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {}
Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {}
+Message::Message(qpid::types::Variant& c) : impl(new MessageImpl(std::string()))
+{
+ setContentObject(c);
+}
Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {}
Message::~Message() { delete impl; }
@@ -75,6 +79,13 @@ void Message::setContent(const std::string& c) { impl->setBytes(c); }
void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); }
std::string Message::getContent() const { return impl->getBytes(); }
+void Message::setContentBytes(const std::string& c) { impl->setBytes(c); }
+std::string Message::getContentBytes() const { return impl->getBytes(); }
+
+qpid::types::Variant& Message::getContentObject() { return impl->getContent(); }
+void Message::setContentObject(const qpid::types::Variant& c) { impl->getContent() = c; }
+const qpid::types::Variant& Message::getContentObject() const { return impl->getContent(); }
+
const char* Message::getContentPtr() const
{
return impl->getBytes().data();
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
index fc9bc5dfa1..7b5854745e 100644
--- a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,19 +30,21 @@ const std::string EMPTY_STRING = "";
using namespace qpid::types;
-MessageImpl::MessageImpl(const std::string& c) :
+MessageImpl::MessageImpl(const std::string& c) :
priority(0),
ttl(0),
durable(false),
redelivered(false),
bytes(c),
+ contentDecoded(false),
internalId(0) {}
-MessageImpl::MessageImpl(const char* chars, size_t count) :
+MessageImpl::MessageImpl(const char* chars, size_t count) :
priority(0),
ttl(0),
durable (false),
redelivered(false),
bytes(chars, count),
+ contentDecoded(false),
internalId(0) {}
void MessageImpl::setReplyTo(const Address& d)
@@ -167,21 +169,35 @@ void MessageImpl::setBytes(const char* chars, size_t count)
bytes.assign(chars, count);
updated();
}
-void MessageImpl::appendBytes(const char* chars, size_t count)
-{
- bytes.append(chars, count);
- updated();
-}
const std::string& MessageImpl::getBytes() const
{
- if (!bytes.size() && encoded) encoded->getBody(bytes);
- return bytes;
+ if (encoded && !contentDecoded) {
+ encoded->getBody(bytes, content);
+ contentDecoded = true;
+ }
+ if (bytes.empty() && !content.isVoid()) return content.getString();
+ else return bytes;
}
std::string& MessageImpl::getBytes()
{
- if (!bytes.size() && encoded) encoded->getBody(bytes);
updated();//have to assume body may be edited, invalidating our message
- return bytes;
+ if (bytes.empty() && !content.isVoid()) return content.getString();
+ else return bytes;
+}
+
+qpid::types::Variant& MessageImpl::getContent()
+{
+ updated();//have to assume content may be edited, invalidating our message
+ return content;
+}
+
+const qpid::types::Variant& MessageImpl::getContent() const
+{
+ if (encoded && !contentDecoded) {
+ encoded->getBody(bytes, content);
+ contentDecoded = true;
+ }
+ return content;
}
void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; }
@@ -197,7 +213,10 @@ void MessageImpl::updated()
if (!userId.size() && encoded) encoded->getUserId(userId);
if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId);
if (!headers.size() && encoded) encoded->populate(headers);
- if (!bytes.size() && encoded) encoded->getBody(bytes);
+ if (encoded && !contentDecoded) {
+ encoded->getBody(bytes, content);
+ contentDecoded = true;
+ }
encoded.reset();
}
@@ -210,5 +229,4 @@ const MessageImpl& MessageImplAccess::get(const Message& msg)
{
return *msg.impl;
}
-
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h
index 915c790153..6387daafb7 100644
--- a/qpid/cpp/src/qpid/messaging/MessageImpl.h
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,6 +47,8 @@ class MessageImpl
mutable qpid::types::Variant::Map headers;
mutable std::string bytes;
+ mutable qpid::types::Variant content;
+ mutable bool contentDecoded;
boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded;
qpid::framing::SequenceNumber internalId;
@@ -87,9 +89,10 @@ class MessageImpl
void setBytes(const std::string& bytes);
void setBytes(const char* chars, size_t count);
- void appendBytes(const char* chars, size_t count);
const std::string& getBytes() const;
std::string& getBytes();
+ qpid::types::Variant& getContent();
+ const qpid::types::Variant& getContent() const;
void setInternalId(qpid::framing::SequenceNumber id);
qpid::framing::SequenceNumber getInternalId();
diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
index b4e0819980..a0bf8dc575 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
@@ -22,13 +22,18 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/amqp/Decoder.h"
+#include "qpid/amqp/DataBuilder.h"
+#include "qpid/amqp/ListBuilder.h"
+#include "qpid/amqp/MapBuilder.h"
+#include "qpid/amqp/typecodes.h"
+#include "qpid/types/encodings.h"
+#include "qpid/log/Statement.h"
#include <boost/lexical_cast.hpp>
#include <string.h>
namespace qpid {
namespace messaging {
namespace amqp {
-
using namespace qpid::amqp;
EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0)
@@ -178,9 +183,39 @@ void EncodedMessage::getCorrelationId(std::string& s) const
{
correlationId.assign(s);
}
-void EncodedMessage::getBody(std::string& s) const
+void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const
{
- s.assign(body.data, body.size);
+ //TODO: based on section type, populate content
+ if (!content.isVoid()) {
+ c = content;//integer types, floats, bool etc
+ //TODO: populate raw data?
+ } else {
+ if (bodyType.empty()
+ || bodyType == qpid::amqp::typecodes::BINARY_NAME
+ || bodyType == qpid::amqp::typecodes::STRING_NAME
+ || bodyType == qpid::amqp::typecodes::SYMBOL_NAME)
+ {
+ c = std::string(body.data, body.size);
+ c.setEncoding(bodyType);
+ } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+ qpid::amqp::ListBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getList();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+ qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getValue().asMap();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
+ if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
+ raw.assign(body.data, body.size);
+ }
+ }
}
qpid::amqp::CharSequence EncodedMessage::getBody() const
@@ -216,6 +251,7 @@ bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) c
}
+
EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m)
{
//set up defaults as needed:
@@ -252,12 +288,32 @@ void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequenc
void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; }
void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; }
void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; }
-void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&)
+
+void EncodedMessage::InitialScan::onData(const qpid::amqp::CharSequence& v)
+{
+ em.body = v;
+}
+void EncodedMessage::InitialScan::onAmqpSequence(const qpid::amqp::CharSequence& v)
{
- //TODO: how to communicate the type, i.e. descriptor?
em.body = v;
+ em.bodyType = qpid::amqp::typecodes::LIST_NAME;
}
-void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
+void EncodedMessage::InitialScan::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& type)
+{
+ em.body = v;
+ if (type == qpid::amqp::typecodes::STRING_NAME) {
+ em.bodyType = qpid::types::encodings::UTF8;
+ } else if (type == qpid::amqp::typecodes::SYMBOL_NAME) {
+ em.bodyType = qpid::types::encodings::ASCII;
+ } else {
+ em.bodyType = type;
+ }
+}
+void EncodedMessage::InitialScan::onAmqpValue(const qpid::types::Variant& v)
+{
+ em.content = v;
+}
+
void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; }
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h
index 09a9d948d5..233b718dec 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h
@@ -89,13 +89,14 @@ class EncodedMessage
void getMessageId(std::string&) const;
void getUserId(std::string&) const;
void getCorrelationId(std::string&) const;
+ void populate(qpid::types::Variant::Map&) const;
+ void getBody(std::string&, qpid::types::Variant&) const;
void init(qpid::messaging::MessageImpl&);
- void populate(qpid::types::Variant::Map&) const;
- void getBody(std::string&) const;
qpid::amqp::CharSequence getBareMessage() const;
qpid::amqp::CharSequence getBody() const;
bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const;
+
private:
size_t size;
char* data;
@@ -130,8 +131,12 @@ class EncodedMessage
void onApplicationProperties(const qpid::amqp::CharSequence&);
void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
void onMessageAnnotations(const qpid::amqp::CharSequence&);
- void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
- void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
+
+ void onData(const qpid::amqp::CharSequence&);
+ void onAmqpSequence(const qpid::amqp::CharSequence&);
+ void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type);
+ void onAmqpValue(const qpid::types::Variant&);
+
void onFooter(const qpid::amqp::CharSequence&);
private:
EncodedMessage& em;
@@ -164,7 +169,11 @@ class EncodedMessage
qpid::amqp::CharSequence replyToGroupId;
//application-properties:
qpid::amqp::CharSequence applicationProperties;
+ //application data:
qpid::amqp::CharSequence body;
+ std::string bodyType;
+ qpid::types::Variant content;
+
//footer:
qpid::amqp::CharSequence footer;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 6e3a432a59..1043a0c62c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -440,7 +440,15 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
PropertiesAdapter properties(msg, address.getSubject());
ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
//compute size:
- encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, applicationProperties, msg.getBytes()));
+ size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
+ + qpid::amqp::MessageEncoder::getEncodedSize(properties)
+ + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
+ if (msg.getContent().isVoid()) {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
+ } else {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
+ }
+ encoded.resize(contentSize);
QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
//write header:
@@ -451,7 +459,12 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
//write application-properties
encoder.writeApplicationProperties(applicationProperties);
//write body
- if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ if (!msg.getContent().isVoid()) {
+ //write as AmqpValue
+ encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
+ } else if (msg.getBytes().size()) {
+ encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ }
if (encoder.getPosition() < encoded.getSize()) {
QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
encoded.trim(encoder.getPosition());
diff --git a/qpid/cpp/src/qpid/types/encodings.h b/qpid/cpp/src/qpid/types/encodings.h
new file mode 100644
index 0000000000..827b6964b9
--- /dev/null
+++ b/qpid/cpp/src/qpid/types/encodings.h
@@ -0,0 +1,33 @@
+#ifndef QPID_TYPES_ENCODINGS_H
+#define QPID_TYPES_ENCODINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace types {
+namespace encodings {
+const std::string BINARY("binary");
+const std::string UTF8("utf8");
+const std::string ASCII("ascii");
+}
+}} // namespace qpid::types
+
+#endif /*!QPID_TYPES_ENCODINGS_H*/
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index d43598f551..d00b828ddc 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -232,8 +232,10 @@ int main(int argc, char ** argv)
std::cout << "Properties: " << msg.getProperties() << std::endl;
std::cout << std::endl;
}
- if (opts.printContent)
- std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
+ if (opts.printContent) {
+ if (!msg.getContentObject().isVoid()) std::cout << msg.getContentObject() << std::endl;
+ else std::cout << msg.getContent() << std::endl;
+ }
if (opts.messages && count >= opts.messages) done = true;
}
}
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index 72ccf1466d..efc6f95448 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -262,9 +262,8 @@ class MapContentGenerator : public ContentGenerator {
public:
MapContentGenerator(const Options& opt) : opts(opt) {}
virtual bool setContent(Message& msg) {
- Variant::Map map;
- opts.setEntries(map);
- encode(map, msg);
+ msg.getContentObject() = qpid::types::Variant::Map();
+ opts.setEntries(msg.getContentObject().asMap());
return true;
}
private: