diff options
| author | Gordon Sim <gsim@apache.org> | 2013-09-20 15:43:34 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-09-20 15:43:34 +0000 |
| commit | abd0ec3213fa84421f6c845113fd129060d3ccda (patch) | |
| tree | 1f59961b234e8de5372abed4c8dc19e0194f0e6a /qpid/cpp/src | |
| parent | e92fbd1a9020c95ced9f664f8f706d28463083a1 (diff) | |
| download | qpid-python-abd0ec3213fa84421f6c845113fd129060d3ccda.tar.gz | |
QPID-5146: fix handling of capabilities
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1525040 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 121 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 39 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 4 |
3 files changed, 118 insertions, 46 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 0344ea537e..4627d724a5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -57,15 +57,16 @@ namespace broker { namespace amqp { namespace { -bool is_capability_requested(const std::string& name, pn_data_t* capabilities) +pn_bytes_t convert(const std::string& s) { - pn_data_rewind(capabilities); - while (pn_data_next(capabilities)) { - pn_bytes_t c = pn_data_get_symbol(capabilities); - std::string s(c.start, c.size); - if (s == name) return true; - } - return false; + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} +std::string convert(pn_bytes_t in) +{ + return std::string(in.start, in.size); } //capabilities const std::string CREATE_ON_DEMAND("create-on-demand"); @@ -76,40 +77,90 @@ const std::string DIRECT_FILTER("legacy-amqp-direct-binding"); const std::string TOPIC_FILTER("legacy-amqp-topic-binding"); const std::string SHARED("shared"); -void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node) +void writeCapabilities(pn_data_t* out, const std::vector<std::string>& supported) { - pn_data_rewind(in); - while (pn_data_next(in)) { - pn_bytes_t c = pn_data_get_symbol(in); - std::string s(c.start, c.size); - if (s == DURABLE) { - if (node->isDurable()) pn_data_put_symbol(out, c); - } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) { - pn_data_put_symbol(out, c); + if (supported.size() == 1) { + pn_data_put_symbol(out, convert(supported.front())); + } else if (supported.size() > 1) { + pn_data_put_array(out, false, PN_SYMBOL); + pn_data_enter(out); + for (std::vector<std::string>::const_iterator i = supported.begin(); i != supported.end(); ++i) { + pn_data_put_symbol(out, convert(*i)); } + pn_data_exit(out); } } -void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node) +template <class F> +void readCapabilities(pn_data_t* data, F f) { - pn_data_rewind(in); - while (pn_data_next(in)) { - pn_bytes_t c = pn_data_get_symbol(in); - std::string s(c.start, c.size); - if (s == DURABLE) { - if (node->isDurable()) pn_data_put_symbol(out, c); - } else if (s == SHARED) { - pn_data_put_symbol(out, c); - } else if (s == CREATE_ON_DEMAND || s == TOPIC) { - pn_data_put_symbol(out, c); - } else if (s == DIRECT_FILTER) { - if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c); - } else if (s == TOPIC_FILTER) { - if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c); + pn_data_rewind(data); + if (pn_data_next(data)) { + pn_type_t type = pn_data_type(data); + if (type == PN_ARRAY) { + pn_data_enter(data); + while (pn_data_next(data)) { + f(convert(pn_data_get_symbol(data))); + } + pn_data_exit(data); + } else if (type == PN_SYMBOL) { + f(convert(pn_data_get_symbol(data))); + } else { + QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type)); } } } +void matchCapability(const std::string& name, bool* result, const std::string& s) +{ + if (s == name) *result = true; +} + +bool is_capability_requested(const std::string& name, pn_data_t* capabilities) +{ + bool result(false); + readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1)); + return result; +} + +void collectQueueCapabilities(boost::shared_ptr<Queue> node, std::vector<std::string>* supported, const std::string& s) +{ + if (s == DURABLE) { + if (node->isDurable()) supported->push_back(s); + } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) { + supported->push_back(s); + } +} + +void collectExchangeCapabilities(boost::shared_ptr<Exchange> node, std::vector<std::string>* supported, const std::string& s) +{ + if (s == DURABLE) { + if (node->isDurable()) supported->push_back(s); + } else if (s == SHARED) { + supported->push_back(s); + } else if (s == CREATE_ON_DEMAND || s == TOPIC) { + supported->push_back(s); + } else if (s == DIRECT_FILTER) { + if (node->getType() == DirectExchange::typeName) supported->push_back(s); + } else if (s == TOPIC_FILTER) { + if (node->getType() == TopicExchange::typeName) supported->push_back(s); + } +} + +void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node) +{ + std::vector<std::string> supported; + readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1)); + writeCapabilities(out, supported); +} + +void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node) +{ + std::vector<std::string> supported; + readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1)); + writeCapabilities(out, supported); +} + } class IncomingToQueue : public DecodingIncoming @@ -150,6 +201,12 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te node.queue = connection.getBroker().getQueues().find(name); node.topic = connection.getTopics().get(name); if (node.topic) node.exchange = node.topic->getExchange(); + if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) { + node.properties.read(pn_terminus_properties(terminus)); + if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) { + throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists"); + } + } if (!node.queue && !node.exchange) { if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) { //is it a queue or an exchange? diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 603ead7074..beac50cdac 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -426,16 +426,23 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) QPID_LOG(debug, "checking assertions: " << capabilities); //ensure all desired capabilities have been offered std::set<std::string> desired; - if (type.size()) desired.insert(type); - if (durableNode) desired.insert(DURABLE); for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { - desired.insert(i->asString()); + if (*i != CREATE_ON_DEMAND) desired.insert(i->asString()); } pn_data_t* data = pn_terminus_capabilities(terminus); - while (pn_data_next(data)) { - pn_bytes_t c = pn_data_get_symbol(data); - std::string s(c.start, c.size); - desired.erase(s); + if (pn_data_next(data)) { + pn_type_t type = pn_data_type(data); + if (type == PN_ARRAY) { + pn_data_enter(data); + while (pn_data_next(data)) { + desired.erase(convert(pn_data_get_symbol(data))); + } + pn_data_exit(data); + } else if (type == PN_SYMBOL) { + desired.erase(convert(pn_data_get_symbol(data))); + } else { + QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type)); + } } if (desired.size()) { @@ -614,12 +621,20 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) { + if (create) capabilities.push_back(CREATE_ON_DEMAND); + if (!type.empty()) capabilities.push_back(type); + if (durableNode) capabilities.push_back(DURABLE); + pn_data_t* data = pn_terminus_capabilities(terminus); - if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND)); - if (type.size()) pn_data_put_symbol(data, convert(type)); - if (durableNode) pn_data_put_symbol(data, convert(DURABLE)); - for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { - pn_data_put_symbol(data, convert(i->asString())); + if (capabilities.size() == 1) { + pn_data_put_symbol(data, convert(capabilities.front().asString())); + } else if (capabilities.size() > 1) { + pn_data_put_array(data, false, PN_SYMBOL); + pn_data_enter(data); + for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { + pn_data_put_symbol(data, convert(i->asString())); + } + pn_data_exit(data); } } std::string AddressHelper::getLinkName(const Address& address) diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index afa2db6791..f3b03f4c84 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -265,8 +265,8 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); lnk->configure(); attach(lnk->sender); - lnk->verify(); checkClosed(ssn, lnk); + lnk->verify(); QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); } @@ -275,8 +275,8 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); lnk->configure(); attach(lnk->receiver, lnk->capacity); - lnk->verify(); checkClosed(ssn, lnk); + lnk->verify(); QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); } |
