From 443e02c688ef78b0cb7ae9c761fd2c42a383fcac Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 29 Mar 2013 21:13:33 +0000 Subject: QPID-4679: Cleaned up processing of addresses, including errors and warnings where elements can not be supported git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1462646 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/amqp/Session.cpp | 45 +++++- qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 179 ++++++++++++++++++--- qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h | 15 +- .../src/qpid/messaging/amqp/ConnectionContext.cpp | 2 + .../src/qpid/messaging/amqp/ReceiverContext.cpp | 27 ++-- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h | 7 +- qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 22 +-- qpid/cpp/src/qpid/messaging/amqp/SenderContext.h | 7 +- 8 files changed, 241 insertions(+), 63 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index c779f47135..ee55a3e08b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -63,8 +63,44 @@ bool is_capability_requested(const std::string& name, pn_data_t* capabilities) } return false; } - +//capabilities const std::string CREATE_ON_DEMAND("create-on-demand"); +const std::string DURABLE("durable"); +const std::string QUEUE("queue"); +const std::string TOPIC("topic"); +const std::string DIRECT_FILTER("legacy-amqp-direct-binding"); +const std::string TOPIC_FILTER("legacy-amqp-topic-binding"); + +void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr node) +{ + while (pn_data_next(in)) { + pn_bytes_t c = pn_data_get_symbol(in); + std::string s(c.start, c.size); + if (s == DURABLE) { + if (node->isDurable()) pn_data_put_symbol(out, c); + } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) { + pn_data_put_symbol(out, c); + } + } +} + +void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr node) +{ + while (pn_data_next(in)) { + pn_bytes_t c = pn_data_get_symbol(in); + std::string s(c.start, c.size); + if (s == DURABLE) { + if (node->isDurable()) pn_data_put_symbol(out, c); + } else if (s == CREATE_ON_DEMAND || s == TOPIC) { + pn_data_put_symbol(out, c); + } else if (s == DIRECT_FILTER) { + if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c); + } else if (s == TOPIC_FILTER) { + if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c); + } + } +} + } class IncomingToQueue : public DecodingIncoming @@ -178,6 +214,10 @@ void Session::attach(pn_link_t* link) void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name) { ResolvedNode node = resolve(name, target, true); + //set capabilities + if (node.queue) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue); + else if (node.exchange) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange); + const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link)); if (!sourceAddress) { sourceAddress = pn_terminus_get_address(pn_link_source(link)); @@ -205,6 +245,9 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name) { ResolvedNode node = resolve(name, source, false); + if (node.queue) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue); + else if (node.exchange) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange); + Filter filter; filter.read(pn_terminus_filter(source)); const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link)); diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index a46606a526..7b9934fb26 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -20,8 +20,10 @@ */ #include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/Address.h" +#include "qpid/messaging/AddressImpl.h" #include "qpid/log/Statement.h" #include +#include #include extern "C" { #include @@ -47,10 +49,13 @@ const std::string SENDER("sender"); const std::string NODE("node"); const std::string LINK("link"); +const std::string CAPABILITIES("capabilities"); +const std::string PROPERTIES("properties"); const std::string TYPE("type"); const std::string TOPIC("topic"); const std::string QUEUE("queue"); +const std::string DURABLE("durable"); //distribution modes: const std::string MOVE("move"); @@ -61,6 +66,11 @@ const std::string CREATE_ON_DEMAND("create-on-demand"); const std::string DUMMY("."); +const std::string X_DECLARE("x-declare"); +const std::string X_BINDINGS("x-bindings"); +const std::string X_SUBSCRIBE("x-subscribe"); +const std::string ARGUMENTS("arguments"); + const std::vector RECEIVER_MODES = boost::assign::list_of(ALWAYS) (RECEIVER); const std::vector SENDER_MODES = boost::assign::list_of(ALWAYS) (SENDER); @@ -72,6 +82,24 @@ pn_bytes_t convert(const std::string& s) return result; } +bool contains(const Variant::List& list, const std::string& item) +{ + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + if (*i == item) return true; + } + return false; +} + +bool test(const Variant::Map& options, const std::string& name) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + return j->second; + } +} + bool bind(const Variant::Map& options, const std::string& name, std::string& variable) { Variant::Map::const_iterator j = options.find(name); @@ -94,6 +122,17 @@ bool bind(const Variant::Map& options, const std::string& name, Variant::Map& va } } +bool bind(const Variant::Map& options, const std::string& name, Variant::List& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asList(); + return true; + } +} + bool bind(const Address& address, const std::string& name, std::string& variable) { return bind(address.getOptions(), name, variable); @@ -111,9 +150,23 @@ bool in(const std::string& value, const std::vector& choices) } return false; } +void add(Variant::Map& target, const Variant::Map& source) +{ + for (Variant::Map::const_iterator i = source.begin(); i != source.end(); ++i) { + target[i->first] = i->second; + } +} +void flatten(Variant::Map& base, const std::string& nested) +{ + Variant::Map::iterator i = base.find(nested); + if (i != base.end()) { + add(base, i->second.asMap()); + } + base.erase(i); +} } -AddressHelper::AddressHelper(const Address& address) +AddressHelper::AddressHelper(const Address& address) : isTemporary(AddressImpl::isTemporary(address)), name(address.getName()), type(address.getType()) { bind(address, CREATE, createPolicy); bind(address, DELETE, deletePolicy); @@ -121,20 +174,81 @@ AddressHelper::AddressHelper(const Address& address) bind(address, NODE, node); bind(address, LINK, link); + bind(node, PROPERTIES, properties); + bind(node, CAPABILITIES, capabilities); + durableNode = test(node, DURABLE); + + if (!deletePolicy.empty()) { + throw qpid::messaging::AddressError("Delete policies not supported over AMQP 1.0."); + } + if (node.find(X_BINDINGS) != node.end()) { + throw qpid::messaging::AddressError("Node scoped x-bindings element not supported over AMQP 1.0."); + } + if (link.find(X_BINDINGS) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-bindings element not supported over AMQP 1.0."); + } + if (link.find(X_SUBSCRIBE) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-subscribe element not supported over AMQP 1.0."); + } + if (link.find(X_DECLARE) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-declare element not supported over AMQP 1.0."); + } + //massage x-declare into properties + Variant::Map::iterator i = node.find(X_DECLARE); + if (i != node.end()) { + Variant::Map x_declare = i->second.asMap(); + flatten(x_declare, ARGUMENTS); + add(properties, x_declare); + node.erase(i); + } + + if (properties.size() && !(isTemporary || createPolicy.size())) { + QPID_LOG(warning, "Properties will be ignored! " << address); + } } -bool AddressHelper::createEnabled(CheckMode mode) const +void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) { - return enabled(createPolicy, mode); + if (assertEnabled(mode)) { + QPID_LOG(debug, "checking assertions: " << capabilities); + //ensure all desired capabilities have been offerred + std::set desired; + if (type.size()) desired.insert(type); + if (durableNode) desired.insert(DURABLE); + for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { + desired.insert(i->asString()); + } + pn_data_t* data = pn_terminus_capabilities(terminus); + while (pn_data_next(data)) { + pn_bytes_t c = pn_data_get_symbol(data); + std::string s(c.start, c.size); + desired.erase(s); + } + + if (desired.size()) { + std::stringstream missing; + missing << "Desired capabilities not met: "; + bool first(true); + for (std::set::const_iterator i = desired.begin(); i != desired.end(); ++i) { + if (first) first = false; + else missing << ", "; + missing << *i; + } + throw qpid::messaging::AssertionFailed(missing.str()); + } + } } -bool AddressHelper::deleteEnabled(CheckMode mode) const + +bool AddressHelper::createEnabled(CheckMode mode) const { - return enabled(deletePolicy, mode); + return enabled(createPolicy, mode); } + bool AddressHelper::assertEnabled(CheckMode mode) const { return enabled(assertPolicy, mode); } + bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const { bool result = false; @@ -158,32 +272,53 @@ const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const return link; } -void AddressHelper::setNodeProperties(pn_terminus_t* terminus, bool dynamic) +void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) { - if (dynamic) { - pn_terminus_set_address(terminus, DUMMY.c_str());//Workaround for proton bug + bool createOnDemand(false); + if (isTemporary) { + //application expects a name to be generated + pn_terminus_set_address(terminus, DUMMY.c_str());//workaround for PROTON-277 pn_terminus_set_dynamic(terminus, true); + setNodeProperties(terminus); } else { - pn_data_t* capabilities = pn_terminus_capabilities(terminus); - if (!capabilities) { - QPID_LOG(error, "!!!No capabilities!!!"); + pn_terminus_set_address(terminus, name.c_str()); + if (createEnabled(mode)) { + //application expects name of node to be as specified + setNodeProperties(terminus); + createOnDemand = true; } - pn_data_put_symbol(capabilities, convert(CREATE_ON_DEMAND)); } + setCapabilities(terminus, createOnDemand); +} - //properties for dynamically created node: - if (node.size()) { +void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) +{ + pn_data_t* data = pn_terminus_capabilities(terminus); + if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND)); + if (type.size()) pn_data_put_symbol(data, convert(type)); + if (durableNode) pn_data_put_symbol(data, convert(DURABLE)); + for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { + pn_data_put_symbol(data, convert(i->asString())); + } +} + +void AddressHelper::setNodeProperties(pn_terminus_t* terminus) +{ + if (properties.size() || type.size()) { pn_data_t* data = pn_terminus_properties(terminus); pn_data_put_map(data); pn_data_enter(data); - for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) { - if (i->first == TYPE) { - pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); - pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE)); - } else { - pn_data_put_symbol(data, convert(i->first)); - pn_data_put_string(data, convert(i->second.asString())); - } + if (type.size()) { + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(type == TOPIC ? COPY : MOVE)); + } + if (durableNode) { + pn_data_put_symbol(data, convert(DURABLE)); + pn_data_put_bool(data, true); + } + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + pn_data_put_symbol(data, convert(i->first)); + pn_data_put_string(data, convert(i->second.asString())); } pn_data_exit(data); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index 2442619ed3..4dd441d461 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -36,22 +36,29 @@ class AddressHelper enum CheckMode {FOR_RECEIVER, FOR_SENDER}; AddressHelper(const Address& address); - bool createEnabled(CheckMode mode) const; - bool deleteEnabled(CheckMode mode) const; - bool assertEnabled(CheckMode mode) const; + void configure(pn_terminus_t* terminus, CheckMode mode); + void checkAssertion(pn_terminus_t* terminus, CheckMode mode); - void setNodeProperties(pn_terminus_t*, bool dynamic); const qpid::types::Variant::Map& getNodeProperties() const; const qpid::types::Variant::Map& getLinkProperties() const; private: + bool isTemporary; std::string createPolicy; std::string assertPolicy; std::string deletePolicy; qpid::types::Variant::Map node; qpid::types::Variant::Map link; + qpid::types::Variant::Map properties; + qpid::types::Variant::List capabilities; std::string name; + std::string type; + bool durableNode; bool enabled(const std::string& policy, CheckMode mode) const; + bool createEnabled(CheckMode mode) const; + bool assertEnabled(CheckMode mode) const; + void setCapabilities(pn_terminus_t* terminus, bool create); + void setNodeProperties(pn_terminus_t* terminus); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 9036031931..5f6fda81c5 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -299,6 +299,7 @@ void ConnectionContext::attach(boost::shared_ptr ssn, boost::sha lnk->address.setName(pn_terminus_get_address(t)); QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName()); } + lnk->verify(t); QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); } @@ -316,6 +317,7 @@ void ConnectionContext::attach(boost::shared_ptr ssn, boost::sha lnk->address.setName(pn_terminus_get_address(s)); QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName()); } + lnk->verify(s); QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index f7b06ddc05..a495bc25c6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -19,7 +19,6 @@ * */ #include "qpid/messaging/amqp/ReceiverContext.h" -#include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" @@ -36,6 +35,7 @@ namespace amqp { ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) : name(n), address(a), + helper(address), receiver(pn_receiver(session, name.c_str())), capacity(0) {} ReceiverContext::~ReceiverContext() @@ -108,26 +108,17 @@ uint64_t getFilterDescriptor(const std::string& key) return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; } } - -void ReceiverContext::configure() const +void ReceiverContext::verify(pn_terminus_t* source) +{ + helper.checkAssertion(source, AddressHelper::FOR_RECEIVER); +} +void ReceiverContext::configure() { configure(pn_link_source(receiver)); } -void ReceiverContext::configure(pn_terminus_t* source) const -{ - //dynamic create: - AddressHelper helper(address); - if (AddressImpl::isTemporary(address)) { - //application expects a name to be generated - QPID_LOG(debug, "source is dynamic"); - helper.setNodeProperties(source, true); - } else { - pn_terminus_set_address(source, address.getName().c_str()); - if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) { - //application expects name of node to be as specified - helper.setNodeProperties(source, false); - } - } +void ReceiverContext::configure(pn_terminus_t* source) +{ + helper.configure(source, AddressHelper::FOR_RECEIVER); // Look specifically for qpid.selector link property and add a filter for it qpid::types::Variant::Map::const_iterator i = helper.getLinkProperties().find("selector"); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 9c5386157b..79049d9263 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -22,6 +22,7 @@ * */ #include "qpid/messaging/Address.h" +#include "qpid/messaging/amqp/AddressHelper.h" #include #include "qpid/sys/IntegerTypes.h" @@ -54,15 +55,17 @@ class ReceiverContext const std::string& getName() const; const std::string& getSource() const; bool isClosed() const; - void configure() const; + void configure(); + void verify(pn_terminus_t*); Address getAddress() const; private: friend class ConnectionContext; const std::string name; Address address; + AddressHelper helper; pn_link_t* receiver; uint32_t capacity; - void configure(pn_terminus_t*) const; + void configure(pn_terminus_t*); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index fe74a4bca8..0b3de7e680 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -20,7 +20,6 @@ */ #include "qpid/messaging/amqp/SenderContext.h" #include "qpid/messaging/amqp/EncodedMessage.h" -#include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/amqp/descriptors.h" #include "qpid/amqp/MessageEncoder.h" @@ -41,6 +40,7 @@ namespace amqp { SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) : name(n), address(a), + helper(address), sender(pn_sender(session, n.c_str())), capacity(1000) {} SenderContext::~SenderContext() @@ -342,23 +342,17 @@ void SenderContext::Delivery::settle() { pn_delivery_settle(token); } -void SenderContext::configure() const +void SenderContext::verify(pn_terminus_t* target) +{ + helper.checkAssertion(target, AddressHelper::FOR_SENDER); +} +void SenderContext::configure() { configure(pn_link_target(sender)); } -void SenderContext::configure(pn_terminus_t* target) const +void SenderContext::configure(pn_terminus_t* target) { - AddressHelper helper(address); - if (AddressImpl::isTemporary(address)) { - //application expects a name to be generated - helper.setNodeProperties(target, true); - } else { - pn_terminus_set_address(target, address.getName().c_str()); - if (helper.createEnabled(AddressHelper::FOR_SENDER)) { - //application expects name of node to be as specified - helper.setNodeProperties(target, false); - } - } + helper.configure(target, AddressHelper::FOR_SENDER); } bool SenderContext::settled() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 2969e75a16..ba563af2dc 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -26,6 +26,7 @@ #include #include "qpid/sys/IntegerTypes.h" #include "qpid/messaging/Address.h" +#include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/amqp/EncodedMessage.h" struct pn_delivery_t; @@ -69,7 +70,8 @@ class SenderContext const std::string& getName() const; const std::string& getTarget() const; Delivery* send(const qpid::messaging::Message& message); - void configure() const; + void configure(); + void verify(pn_terminus_t*); bool settled(); Address getAddress() const; private: @@ -78,13 +80,14 @@ class SenderContext const std::string name; qpid::messaging::Address address; + AddressHelper helper; pn_link_t* sender; int32_t nextId; Deliveries deliveries; uint32_t capacity; uint32_t processUnsettled(); - void configure(pn_terminus_t*) const; + void configure(pn_terminus_t*); }; }}} // namespace qpid::messaging::amqp -- cgit v1.2.1