diff options
Diffstat (limited to 'cpp/src/qpid/broker/DirectExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 71 |
1 files changed, 27 insertions, 44 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 4aa68bee9c..bc6d7fe495 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -42,41 +42,22 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable, } bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ - RWlock::ScopedWlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = queues.begin(); i != queues.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == queues.end()) { - Binding::shared_ptr binding (new Binding (routingKey, queue, this)); - bindings[routingKey].push_back(binding); + Mutex::ScopedLock l(lock); + Binding::shared_ptr b(new Binding (routingKey, queue, this)); + if (bindings[routingKey].add_unless(b, MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; - } else{ + } else { return false; } } bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedWlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = queues.begin(); i != queues.end(); i++) - if ((*i)->queue == queue) - break; - - if (i < queues.end()) { - queues.erase(i); - if (queues.empty()) { - bindings.erase(routingKey); - } + Mutex::ScopedLock l(lock); + if (bindings[routingKey].remove_if(MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); @@ -88,16 +69,20 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedRlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; + Queues::ConstPtr p; + { + Mutex::ScopedLock l(lock); + p = bindings[routingKey].snapshot(); + } int count(0); - for(i = queues.begin(); i != queues.end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } + if (p) { + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + } if(!count){ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey); @@ -105,8 +90,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); } - } - else { + } else { if (mgmtExchange != 0) { mgmtExchange->inc_msgRoutes (count); mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); @@ -122,8 +106,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { - std::vector<Binding::shared_ptr>::iterator j; - + Mutex::ScopedLock l(lock); if (routingKey) { Bindings::iterator i = bindings.find(*routingKey); @@ -131,17 +114,17 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin return false; if (!queue) return true; - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + + Queues::ConstPtr p = i->second.snapshot(); + return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { //if no queue or routing key is specified, just report whether any bindings exist return bindings.size() > 0; } else { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + Queues::ConstPtr p = i->second.snapshot(); + if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; + } return false; } |