diff options
| author | Gordon Sim <gsim@apache.org> | 2010-03-31 16:17:17 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2010-03-31 16:17:17 +0000 |
| commit | f6f1900eb98cc1773a88a3ec309afa646438a384 (patch) | |
| tree | 68e50e7aa1819afd283d73700965b539355a779d /cpp/src/qpid/client | |
| parent | 887281838e4bf7825189ce3b0a8d7509789e6a08 (diff) | |
| download | qpid-python-f6f1900eb98cc1773a88a3ec309afa646438a384.tar.gz | |
QPID-664: made changes suggested by Alan Conway, also moved 0-10 map/list codecs to common lib
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@929606 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/Codecs.cpp | 332 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/CodecsInternal.h | 41 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 4 |
6 files changed, 27 insertions, 413 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 990b2a19d8..f64a46ba01 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -19,8 +19,7 @@ * */ #include "qpid/client/amqp0_10/AddressResolution.h" -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" #include "qpid/client/amqp0_10/OutgoingMessage.h" @@ -55,6 +54,7 @@ using qpid::framing::ReplyTo; using qpid::framing::Uuid; using namespace qpid::types; using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; using namespace boost::assign; @@ -262,43 +262,29 @@ bool in(const Variant& value, const std::vector<std::string>& choices) return false; } -bool getReceiverPolicy(const Address& address, const std::string& key) -{ - return in(address.getOption(key), list_of<std::string>(ALWAYS)(RECEIVER)); -} - -bool getSenderPolicy(const Address& address, const std::string& key) -{ - return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER)); -} - -const Variant& getOption(const Variant::Map& options, const std::vector<std::string>& path, size_t index=0) +const Variant& getOption(const Variant::Map& options, const std::string& name) { - Variant::Map::const_iterator j = options.find(path[index]); + Variant::Map::const_iterator j = options.find(name); if (j == options.end()) { return EMPTY_VARIANT; - } else if (++index < path.size()) { - if (j->second.getType() != VAR_MAP) - throw InvalidAddress((boost::format("Expected %1% to be a map") % j->first).str()); - return getOption(j->second.asMap(), path, index); } else { return j->second; } } -const Variant& getOption(const Address& address, const std::vector<std::string>& path) +const Variant& getOption(const Address& address, const std::string& name) { - return getOption(address.getOptions(), path); + return getOption(address.getOptions(), name); } -const Variant& getOption(const Variant::Map& options, const std::string& name) +bool getReceiverPolicy(const Address& address, const std::string& key) { - Variant::Map::const_iterator j = options.find(name); - if (j == options.end()) { - return EMPTY_VARIANT; - } else { - return j->second; - } + return in(getOption(address, key), list_of<std::string>(ALWAYS)(RECEIVER)); +} + +bool getSenderPolicy(const Address& address, const std::string& key) +{ + return in(getOption(address, key), list_of<std::string>(ALWAYS)(SENDER)); } struct Opt @@ -360,13 +346,14 @@ void Opt::collect(qpid::framing::FieldTable& args) const bool AddressResolution::is_unreliable(const Address& address) { - return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)), + + return in((Opt(address)/LINK/RELIABILITY).str(), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); } bool AddressResolution::is_reliable(const Address& address) { - return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)), + return in((Opt(address)/LINK/RELIABILITY).str(), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); } @@ -433,7 +420,7 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session bool isBrowse(const Address& address) { - const Variant& mode = address.getOption(MODE); + const Variant& mode = getOption(address, MODE); if (!mode.isVoid()) { std::string value = mode.asString(); if (value == BROWSE) return true; @@ -651,9 +638,9 @@ bool isTopic(qpid::client::Session session, const qpid::messaging::Address& addr } Node::Node(const Address& address) : name(address.getName()), - createPolicy(address.getOption(CREATE)), - assertPolicy(address.getOption(ASSERT)), - deletePolicy(address.getOption(DELETE)) + createPolicy(getOption(address, CREATE)), + assertPolicy(getOption(address, ASSERT)), + deletePolicy(getOption(address, DELETE)) { nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList()); linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList()); @@ -908,7 +895,7 @@ bool Node::enabled(const Variant& policy, CheckMode mode) bool Node::createEnabled(const Address& address, CheckMode mode) { - const Variant& policy = address.getOption(CREATE); + const Variant& policy = getOption(address, CREATE); return enabled(policy, mode); } diff --git a/cpp/src/qpid/client/amqp0_10/Codecs.cpp b/cpp/src/qpid/client/amqp0_10/Codecs.cpp deleted file mode 100644 index ce806572e5..0000000000 --- a/cpp/src/qpid/client/amqp0_10/Codecs.cpp +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/types/Variant.h" -#include "qpid/framing/Array.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/FieldValue.h" -#include "qpid/framing/List.h" -#include "qpid/log/Statement.h" -#include <algorithm> -#include <functional> -#include <limits> - -using namespace qpid::framing; -using namespace qpid::messaging; -using namespace qpid::types; - -namespace qpid { -namespace client { -namespace amqp0_10 { - -namespace { -const std::string iso885915("iso-8859-15"); -const std::string utf8("utf8"); -const std::string utf16("utf16"); -const std::string binary("binary"); -const std::string amqp0_10_binary("amqp0-10:binary"); -const std::string amqp0_10_bit("amqp0-10:bit"); -const std::string amqp0_10_datetime("amqp0-10:datetime"); -const std::string amqp0_10_struct("amqp0-10:struct"); -} - -template <class T, class U, class F> void convert(const T& from, U& to, F f) -{ - std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f); -} - -Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in); -FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in); -Variant toVariant(boost::shared_ptr<FieldValue> in); -boost::shared_ptr<FieldValue> toFieldValue(const Variant& in); - -template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f) -{ - T t; - getEncodedValue<T>(in, t); - convert(t, u, f); -} - -template <class T, class U, class F> T* toFieldValueCollection(const U& u, F f) -{ - typename T::ValueType t; - convert(u, t, f); - return new T(t); -} - -FieldTableValue* toFieldTableValue(const Variant::Map& map) -{ - FieldTable ft; - convert(map, ft, &toFieldTableEntry); - return new FieldTableValue(ft); -} - -ListValue* toListValue(const Variant::List& list) -{ - List l; - convert(list, l, &toFieldValue); - return new ListValue(l); -} - -void setEncodingFor(Variant& out, uint8_t code) -{ - switch(code){ - case 0x80: - case 0x90: - case 0xa0: - out.setEncoding(amqp0_10_binary); - break; - case 0x84: - case 0x94: - out.setEncoding(iso885915); - break; - case 0x85: - case 0x95: - out.setEncoding(utf8); - break; - case 0x86: - case 0x96: - out.setEncoding(utf16); - break; - case 0xab: - out.setEncoding(amqp0_10_struct); - break; - default: - //do nothing - break; - } -} - -qpid::types::Uuid getUuid(FieldValue& value) -{ - unsigned char data[16]; - value.getFixedWidthValue<16>(data); - return qpid::types::Uuid(data); -} - -Variant toVariant(boost::shared_ptr<FieldValue> in) -{ - Variant out; - //based on AMQP 0-10 typecode, pick most appropriate variant type - switch (in->getType()) { - //Fixed Width types: - case 0x01: out.setEncoding(amqp0_10_binary); - case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; - case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; - case 0x04: break; //TODO: iso-8859-15 char - case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; - case 0x10: out.setEncoding(amqp0_10_binary); - case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; - case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; - case 0x20: out.setEncoding(amqp0_10_binary); - case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; - case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; - case 0x23: out = in->get<float>(); break; - case 0x27: break; //TODO: utf-32 char - case 0x30: out.setEncoding(amqp0_10_binary); - case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; - case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding - case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; - case 0x33: out = in->get<double>(); break; - - case 0x48: out = getUuid(*in); break; - - //TODO: figure out whether and how to map values with codes 0x40-0xd8 - - case 0xf0: break;//void, which is the default value for Variant - case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant - - //Variable Width types: - //strings: - case 0x80: - case 0x84: - case 0x85: - case 0x86: - case 0x90: - case 0x94: - case 0x95: - case 0x96: - case 0xa0: - case 0xab: - out = in->get<std::string>(); - setEncodingFor(out, in->getType()); - break; - - case 0xa8: - out = Variant::Map(); - translate<FieldTable>(in, out.asMap(), &toVariantMapEntry); - break; - - case 0xa9: - out = Variant::List(); - translate<List>(in, out.asList(), &toVariant); - break; - case 0xaa: //convert amqp0-10 array into variant list - out = Variant::List(); - translate<Array>(in, out.asList(), &toVariant); - break; - - default: - //error? - break; - } - return out; -} - -boost::shared_ptr<FieldValue> convertString(const std::string& value, const std::string& encoding) -{ - bool large = value.size() > std::numeric_limits<uint16_t>::max(); - if (encoding.empty() || encoding == amqp0_10_binary || encoding == binary) { - if (large) { - return boost::shared_ptr<FieldValue>(new Var32Value(value, 0xa0)); - } else { - return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x90)); - } - } else if (encoding == utf8 && !large) { - return boost::shared_ptr<FieldValue>(new Str16Value(value)); - } else if (encoding == utf16 && !large) { - return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x96)); - } else if (encoding == iso885915 && !large) { - return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x94)); - } else { - //either the string is too large for the encoding in amqp 0-10, or the encoding was not recognised - QPID_LOG(warning, "Could not encode " << value.size() << " byte value as " << encoding << ", encoding as vbin32."); - return boost::shared_ptr<FieldValue>(new Var32Value(value, 0xa0)); - } -} - -boost::shared_ptr<FieldValue> toFieldValue(const Variant& in) -{ - boost::shared_ptr<FieldValue> out; - switch (in.getType()) { - case VAR_VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break; - case VAR_BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break; - case VAR_UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break; - case VAR_UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break; - case VAR_UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break; - case VAR_UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break; - case VAR_INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break; - case VAR_INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break; - case VAR_INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break; - case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break; - case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break; - case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break; - case VAR_STRING: out = convertString(in.asString(), in.getEncoding()); break; - case VAR_UUID: out = boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); break; - case VAR_MAP: - out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap())); - break; - case VAR_LIST: - out = boost::shared_ptr<FieldValue>(toListValue(in.asList())); - break; - } - return out; -} - -Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in) -{ - return Variant::Map::value_type(in.first, toVariant(in.second)); -} - -FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in) -{ - return FieldTable::value_type(in.first, toFieldValue(in.second)); -} - -struct EncodeBuffer -{ - char* data; - Buffer buffer; - - EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {} - ~EncodeBuffer() { delete[] data; } - - template <class T> void encode(T& t) { t.encode(buffer); } - - void getData(std::string& s) { - s.assign(data, buffer.getSize()); - } -}; - -struct DecodeBuffer -{ - Buffer buffer; - - DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {} - - template <class T> void decode(T& t) { t.decode(buffer); } - -}; - -template <class T, class U, class F> void _encode(const U& value, std::string& data, F f) -{ - T t; - convert(value, t, f); - EncodeBuffer buffer(t.encodedSize()); - buffer.encode(t); - buffer.getData(data); -} - -template <class T, class U, class F> void _decode(const std::string& data, U& value, F f) -{ - T t; - DecodeBuffer buffer(data); - buffer.decode(t); - convert(t, value, f); -} - -void MapCodec::encode(const Variant& value, std::string& data) -{ - _encode<FieldTable>(value.asMap(), data, &toFieldTableEntry); -} - -void MapCodec::decode(const std::string& data, Variant& value) -{ - value = Variant::Map(); - _decode<FieldTable>(data, value.asMap(), &toVariantMapEntry); -} - -void ListCodec::encode(const Variant& value, std::string& data) -{ - _encode<List>(value.asList(), data, &toFieldValue); -} - -void ListCodec::decode(const std::string& data, Variant& value) -{ - value = Variant::List(); - _decode<List>(data, value.asList(), &toVariant); -} - -void translate(const Variant::Map& from, FieldTable& to) -{ - convert(from, to, &toFieldTableEntry); -} - -void translate(const FieldTable& from, Variant::Map& to) -{ - convert(from, to, &toVariantMapEntry); -} - -const std::string ListCodec::contentType("amqp/list"); -const std::string MapCodec::contentType("amqp/map"); - -}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/CodecsInternal.h b/cpp/src/qpid/client/amqp0_10/CodecsInternal.h deleted file mode 100644 index a110d80b8a..0000000000 --- a/cpp/src/qpid/client/amqp0_10/CodecsInternal.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H -#define QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/types/Variant.h" -#include "qpid/framing/FieldTable.h" - -namespace qpid { -namespace client { -namespace amqp0_10 { - -/** - * Declarations of a couple of conversion functions implemented in - * Codecs.cpp but not exposed through API - */ - -void translate(const qpid::types::Variant::Map& from, qpid::framing::FieldTable& to); -void translate(const qpid::framing::FieldTable& from, qpid::types::Variant::Map& to); - -}}} // namespace qpid::client::amqp0_10 - -#endif /*!QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp index 5e526a2ffc..354d6f6aba 100644 --- a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp +++ b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp @@ -46,7 +46,7 @@ struct FailoverUpdatesImpl : qpid::sys::Runnable FailoverUpdatesImpl(Connection& c) : connection(c), quit(false) { - session = connection.newSession("failover-updates"); + session = connection.createSession("failover-updates"); receiver = session.createReceiver("amq.failover"); thread = qpid::sys::Thread(*this); } diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 345ebfb66d..c26b2eb09f 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -20,8 +20,7 @@ */ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/client/amqp0_10/AddressResolution.h" -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/client/SessionImpl.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/log/Statement.h" @@ -42,6 +41,7 @@ namespace amqp0_10 { using namespace qpid::framing; using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::messaging::MessageImplAccess; @@ -306,7 +306,7 @@ void populate(qpid::messaging::Message& message, FrameSet& command) //e.g. for rejecting. MessageImplAccess::get(message).setInternalId(command.getId()); - command.getContent(message.getContent()); + message.setContent(command.getContent()); populateHeaders(message, command.getHeaders()); } diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index b19b26f903..c22eb5403f 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -20,8 +20,7 @@ */ #include "qpid/client/amqp0_10/OutgoingMessage.h" #include "qpid/client/amqp0_10/AddressResolution.h" -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" @@ -34,6 +33,7 @@ namespace amqp0_10 { using qpid::messaging::Address; using qpid::messaging::MessageImplAccess; using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; void OutgoingMessage::convert(const qpid::messaging::Message& from) { |
