diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/amqp/DataReader.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/amqp/DataReader.cpp')
-rw-r--r-- | cpp/src/qpid/broker/amqp/DataReader.cpp | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/amqp/DataReader.cpp b/cpp/src/qpid/broker/amqp/DataReader.cpp new file mode 100644 index 0000000000..519dd71c9c --- /dev/null +++ b/cpp/src/qpid/broker/amqp/DataReader.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataReader.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/log/Statement.h" +#include <string> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +qpid::amqp::CharSequence convert(pn_bytes_t in) +{ + qpid::amqp::CharSequence out; + out.data = in.start; + out.size = in.size; + return out; +} + +qpid::amqp::CharSequence convert(pn_uuid_t in) +{ + qpid::amqp::CharSequence out; + out.data = in.bytes; + out.size = 16; + return out; +} +} + +DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {} + +void DataReader::read(pn_data_t* data) +{ + /* + while (pn_data_next(data)) { + readOne(data); + } + */ + do { + readOne(data); + } while (pn_data_next(data)); +} +void DataReader::readOne(pn_data_t* data) +{ + qpid::amqp::Descriptor descriptor(0); + bool described = pn_data_is_described(data); + if (described) { + pn_data_enter(data); + pn_data_next(data); + if (pn_data_type(data) == PN_ULONG) { + descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(data)); + } else if (pn_data_type(data) == PN_SYMBOL) { + descriptor = qpid::amqp::Descriptor(convert(pn_data_get_symbol(data))); + } else { + QPID_LOG(notice, "Ignoring descriptor of type " << pn_data_type(data)); + } + pn_data_next(data); + } + switch (pn_data_type(data)) { + case PN_NULL: + reader.onNull(described ? &descriptor : 0); + break; + case PN_BOOL: + reader.onBoolean(pn_data_get_bool(data), described ? &descriptor : 0); + break; + case PN_UBYTE: + reader.onUByte(pn_data_get_ubyte(data), described ? &descriptor : 0); + break; + case PN_BYTE: + reader.onByte(pn_data_get_byte(data), described ? &descriptor : 0); + break; + case PN_USHORT: + reader.onUShort(pn_data_get_ushort(data), described ? &descriptor : 0); + break; + case PN_SHORT: + reader.onShort(pn_data_get_short(data), described ? &descriptor : 0); + break; + case PN_UINT: + reader.onUInt(pn_data_get_uint(data), described ? &descriptor : 0); + break; + case PN_INT: + reader.onInt(pn_data_get_int(data), described ? &descriptor : 0); + break; + case PN_CHAR: + pn_data_get_char(data); + break; + case PN_ULONG: + reader.onULong(pn_data_get_ulong(data), described ? &descriptor : 0); + break; + case PN_LONG: + reader.onLong(pn_data_get_long(data), described ? &descriptor : 0); + break; + case PN_TIMESTAMP: + reader.onTimestamp(pn_data_get_timestamp(data), described ? &descriptor : 0); + break; + case PN_FLOAT: + reader.onFloat(pn_data_get_float(data), described ? &descriptor : 0); + break; + case PN_DOUBLE: + reader.onDouble(pn_data_get_double(data), described ? &descriptor : 0); + break; + case PN_DECIMAL32: + pn_data_get_decimal32(data); + break; + case PN_DECIMAL64: + pn_data_get_decimal64(data); + break; + case PN_DECIMAL128: + pn_data_get_decimal128(data); + break; + case PN_UUID: + reader.onUuid(convert(pn_data_get_uuid(data)), described ? &descriptor : 0); + break; + case PN_BINARY: + reader.onBinary(convert(pn_data_get_binary(data)), described ? &descriptor : 0); + break; + case PN_STRING: + reader.onString(convert(pn_data_get_string(data)), described ? &descriptor : 0); + break; + case PN_SYMBOL: + reader.onSymbol(convert(pn_data_get_symbol(data)), described ? &descriptor : 0); + break; + case PN_DESCRIBED: + break; + case PN_ARRAY: + readArray(data, described ? &descriptor : 0); + break; + case PN_LIST: + readList(data, described ? &descriptor : 0); + break; + case PN_MAP: + readMap(data, described ? &descriptor : 0); + break; + } + if (described) pn_data_exit(data); +} + +void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /*descriptor*/) +{ + //not yet implemented +} + +void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) +{ + size_t count = pn_data_get_list(data); + reader.onStartList(count, qpid::amqp::CharSequence(), descriptor); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + read(data); + } + pn_data_exit(data); + reader.onEndList(count, descriptor); +} + +void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) +{ + size_t count = pn_data_get_map(data); + reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + read(data); + } + pn_data_exit(data); + reader.onEndMap(count, descriptor); +} +}}} // namespace qpid::broker::amqp |