summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/DirectExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/DirectExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp71
1 files changed, 27 insertions, 44 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 4aa68bee9c..bc6d7fe495 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -42,41 +42,22 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable,
}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i == queues.end()) {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingKey].push_back(binding);
+ Mutex::ScopedLock l(lock);
+ Binding::shared_ptr b(new Binding (routingKey, queue, this));
+ if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
return true;
- } else{
+ } else {
return false;
}
}
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i < queues.end()) {
- queues.erase(i);
- if (queues.empty()) {
- bindings.erase(routingKey);
- }
+ Mutex::ScopedLock l(lock);
+ if (bindings[routingKey].remove_if(MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -88,16 +69,20 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedRlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
+ Queues::ConstPtr p;
+ {
+ Mutex::ScopedLock l(lock);
+ p = bindings[routingKey].snapshot();
+ }
int count(0);
- for(i = queues.begin(); i != queues.end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
+ if (p) {
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+ }
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
@@ -105,8 +90,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie
mgmtExchange->inc_msgDrops ();
mgmtExchange->inc_byteDrops (msg.contentSize ());
}
- }
- else {
+ } else {
if (mgmtExchange != 0) {
mgmtExchange->inc_msgRoutes (count);
mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
@@ -122,8 +106,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
- std::vector<Binding::shared_ptr>::iterator j;
-
+ Mutex::ScopedLock l(lock);
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
@@ -131,17 +114,17 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin
return false;
if (!queue)
return true;
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+
+ Queues::ConstPtr p = i->second.snapshot();
+ return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ Queues::ConstPtr p = i->second.snapshot();
+ if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
+ }
return false;
}