diff options
Diffstat (limited to 'cpp/src/qpid/broker/XmlExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/XmlExchange.cpp | 139 |
1 files changed, 59 insertions, 80 deletions
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp index cb0f9a9606..b51791796d 100644 --- a/cpp/src/qpid/broker/XmlExchange.cpp +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -81,29 +81,23 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const try { RWlock::ScopedWlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - for (i = bindings.begin(); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == bindings.end()) { - - Query query(xqilla.parse(X(queryText.c_str()))); - XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); - XmlBinding::vector bindings(1, binding); - bindingsMap[routingKey] = bindings; - QPID_LOG(trace, "Bound successfully with query: " << queryText ); - - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); - } - return true; - } else{ - return false; - } + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::ConstPtr p = bindings.snapshot(); + if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { + Query query(xqilla.parse(X(queryText.c_str()))); + XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); + bindings.add(binding); + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } + return true; + } else { + return false; + } } catch (XQException& e) { throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); @@ -116,25 +110,14 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) { RWlock::ScopedWlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; - - for (i = bindings.begin(); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i < bindings.end()) { - bindings.erase(i); - if (bindings.empty()) { - bindingsMap.erase(routingKey); - } + if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { - return false; + return false; } } @@ -193,13 +176,15 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) { try { - RWlock::ScopedRlock l(lock); - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::iterator i; + XmlBinding::vector::ConstPtr p; + { + RWlock::ScopedRlock l(lock); + p = bindingsMap[routingKey].snapshot(); + if (!p) return; + } int count(0); - for (i = bindings.begin(); i != bindings.end(); i++) { - + for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... msg.deliverTo((*i)->queue); count++; @@ -208,28 +193,25 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT if ((*i)->mgmtBinding != 0) (*i)->mgmtBinding->inc_msgMatched (); } - - if(!count){ - QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } - else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - } - } - } - catch (...) { + } + if (!count) { + QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } else { + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } + } catch (...) { QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } @@ -239,30 +221,27 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { - XmlBinding::vector::iterator j; - + RWlock::ScopedRlock l(lock); if (routingKey) { - XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); + XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); - if (i == bindingsMap.end()) - return false; - if (!queue) - return true; - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + if (i == bindingsMap.end()) + return false; + if (!queue) + return true; + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { - //if no queue or routing key is specified, just report whether any bindings exist - return bindingsMap.size() > 0; + //if no queue or routing key is specified, just report whether any bindings exist + return bindingsMap.size() > 0; } else { - for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; - return false; + for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) { + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; + } + return false; } - return false; } |