summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-09-20 15:43:34 +0000
committerGordon Sim <gsim@apache.org>2013-09-20 15:43:34 +0000
commitabd0ec3213fa84421f6c845113fd129060d3ccda (patch)
tree1f59961b234e8de5372abed4c8dc19e0194f0e6a /qpid/cpp/src
parente92fbd1a9020c95ced9f664f8f706d28463083a1 (diff)
downloadqpid-python-abd0ec3213fa84421f6c845113fd129060d3ccda.tar.gz
QPID-5146: fix handling of capabilities
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1525040 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp121
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp39
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp4
3 files changed, 118 insertions, 46 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 0344ea537e..4627d724a5 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -57,15 +57,16 @@ namespace broker {
namespace amqp {
namespace {
-bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+pn_bytes_t convert(const std::string& s)
{
- pn_data_rewind(capabilities);
- while (pn_data_next(capabilities)) {
- pn_bytes_t c = pn_data_get_symbol(capabilities);
- std::string s(c.start, c.size);
- if (s == name) return true;
- }
- return false;
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+std::string convert(pn_bytes_t in)
+{
+ return std::string(in.start, in.size);
}
//capabilities
const std::string CREATE_ON_DEMAND("create-on-demand");
@@ -76,40 +77,90 @@ const std::string DIRECT_FILTER("legacy-amqp-direct-binding");
const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
const std::string SHARED("shared");
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+void writeCapabilities(pn_data_t* out, const std::vector<std::string>& supported)
{
- pn_data_rewind(in);
- while (pn_data_next(in)) {
- pn_bytes_t c = pn_data_get_symbol(in);
- std::string s(c.start, c.size);
- if (s == DURABLE) {
- if (node->isDurable()) pn_data_put_symbol(out, c);
- } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
- pn_data_put_symbol(out, c);
+ if (supported.size() == 1) {
+ pn_data_put_symbol(out, convert(supported.front()));
+ } else if (supported.size() > 1) {
+ pn_data_put_array(out, false, PN_SYMBOL);
+ pn_data_enter(out);
+ for (std::vector<std::string>::const_iterator i = supported.begin(); i != supported.end(); ++i) {
+ pn_data_put_symbol(out, convert(*i));
}
+ pn_data_exit(out);
}
}
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+template <class F>
+void readCapabilities(pn_data_t* data, F f)
{
- pn_data_rewind(in);
- while (pn_data_next(in)) {
- pn_bytes_t c = pn_data_get_symbol(in);
- std::string s(c.start, c.size);
- if (s == DURABLE) {
- if (node->isDurable()) pn_data_put_symbol(out, c);
- } else if (s == SHARED) {
- pn_data_put_symbol(out, c);
- } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
- pn_data_put_symbol(out, c);
- } else if (s == DIRECT_FILTER) {
- if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c);
- } else if (s == TOPIC_FILTER) {
- if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c);
+ pn_data_rewind(data);
+ if (pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ if (type == PN_ARRAY) {
+ pn_data_enter(data);
+ while (pn_data_next(data)) {
+ f(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ } else if (type == PN_SYMBOL) {
+ f(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
}
}
}
+void matchCapability(const std::string& name, bool* result, const std::string& s)
+{
+ if (s == name) *result = true;
+}
+
+bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+{
+ bool result(false);
+ readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1));
+ return result;
+}
+
+void collectQueueCapabilities(boost::shared_ptr<Queue> node, std::vector<std::string>* supported, const std::string& s)
+{
+ if (s == DURABLE) {
+ if (node->isDurable()) supported->push_back(s);
+ } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
+ supported->push_back(s);
+ }
+}
+
+void collectExchangeCapabilities(boost::shared_ptr<Exchange> node, std::vector<std::string>* supported, const std::string& s)
+{
+ if (s == DURABLE) {
+ if (node->isDurable()) supported->push_back(s);
+ } else if (s == SHARED) {
+ supported->push_back(s);
+ } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
+ supported->push_back(s);
+ } else if (s == DIRECT_FILTER) {
+ if (node->getType() == DirectExchange::typeName) supported->push_back(s);
+ } else if (s == TOPIC_FILTER) {
+ if (node->getType() == TopicExchange::typeName) supported->push_back(s);
+ }
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+{
+ std::vector<std::string> supported;
+ readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1));
+ writeCapabilities(out, supported);
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+{
+ std::vector<std::string> supported;
+ readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1));
+ writeCapabilities(out, supported);
+}
+
}
class IncomingToQueue : public DecodingIncoming
@@ -150,6 +201,12 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
node.queue = connection.getBroker().getQueues().find(name);
node.topic = connection.getTopics().get(name);
if (node.topic) node.exchange = node.topic->getExchange();
+ if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
+ node.properties.read(pn_terminus_properties(terminus));
+ if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
+ }
+ }
if (!node.queue && !node.exchange) {
if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
//is it a queue or an exchange?
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 603ead7074..beac50cdac 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -426,16 +426,23 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
QPID_LOG(debug, "checking assertions: " << capabilities);
//ensure all desired capabilities have been offered
std::set<std::string> desired;
- if (type.size()) desired.insert(type);
- if (durableNode) desired.insert(DURABLE);
for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
- desired.insert(i->asString());
+ if (*i != CREATE_ON_DEMAND) desired.insert(i->asString());
}
pn_data_t* data = pn_terminus_capabilities(terminus);
- while (pn_data_next(data)) {
- pn_bytes_t c = pn_data_get_symbol(data);
- std::string s(c.start, c.size);
- desired.erase(s);
+ if (pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ if (type == PN_ARRAY) {
+ pn_data_enter(data);
+ while (pn_data_next(data)) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ } else if (type == PN_SYMBOL) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
+ }
}
if (desired.size()) {
@@ -614,12 +621,20 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod
void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
{
+ if (create) capabilities.push_back(CREATE_ON_DEMAND);
+ if (!type.empty()) capabilities.push_back(type);
+ if (durableNode) capabilities.push_back(DURABLE);
+
pn_data_t* data = pn_terminus_capabilities(terminus);
- if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND));
- if (type.size()) pn_data_put_symbol(data, convert(type));
- if (durableNode) pn_data_put_symbol(data, convert(DURABLE));
- for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
- pn_data_put_symbol(data, convert(i->asString()));
+ if (capabilities.size() == 1) {
+ pn_data_put_symbol(data, convert(capabilities.front().asString()));
+ } else if (capabilities.size() > 1) {
+ pn_data_put_array(data, false, PN_SYMBOL);
+ pn_data_enter(data);
+ for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->asString()));
+ }
+ pn_data_exit(data);
}
}
std::string AddressHelper::getLinkName(const Address& address)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index afa2db6791..f3b03f4c84 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -265,8 +265,8 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
attach(lnk->sender);
- lnk->verify();
checkClosed(ssn, lnk);
+ lnk->verify();
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
@@ -275,8 +275,8 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
attach(lnk->receiver, lnk->capacity);
- lnk->verify();
checkClosed(ssn, lnk);
+ lnk->verify();
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}