diff options
| author | Gordon Sim <gsim@apache.org> | 2008-09-08 11:13:38 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-09-08 11:13:38 +0000 |
| commit | 27ca6af6141f088f3abff585248393fd26823103 (patch) | |
| tree | 0a2680232934eb5fc9490c484d71649049646662 /cpp/src/qpid/broker | |
| parent | 028745dbc3c47bd6561310678f82f15bd45678d9 (diff) | |
| download | qpid-python-27ca6af6141f088f3abff585248393fd26823103.tar.gz | |
QPID-1264: initial fix for fanout, direct and headers exchanges (fix for remaining types to follow)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@693053 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 71 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DirectExchange.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 42 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.h | 22 |
8 files changed, 100 insertions, 119 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 4aa68bee9c..bc6d7fe495 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -42,41 +42,22 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable, } bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ - RWlock::ScopedWlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = queues.begin(); i != queues.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == queues.end()) { - Binding::shared_ptr binding (new Binding (routingKey, queue, this)); - bindings[routingKey].push_back(binding); + Mutex::ScopedLock l(lock); + Binding::shared_ptr b(new Binding (routingKey, queue, this)); + if (bindings[routingKey].add_unless(b, MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; - } else{ + } else { return false; } } bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedWlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = queues.begin(); i != queues.end(); i++) - if ((*i)->queue == queue) - break; - - if (i < queues.end()) { - queues.erase(i); - if (queues.empty()) { - bindings.erase(routingKey); - } + Mutex::ScopedLock l(lock); + if (bindings[routingKey].remove_if(MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); @@ -88,16 +69,20 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedRlock l(lock); - std::vector<Binding::shared_ptr>& queues(bindings[routingKey]); - std::vector<Binding::shared_ptr>::iterator i; + Queues::ConstPtr p; + { + Mutex::ScopedLock l(lock); + p = bindings[routingKey].snapshot(); + } int count(0); - for(i = queues.begin(); i != queues.end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } + if (p) { + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + } if(!count){ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey); @@ -105,8 +90,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); } - } - else { + } else { if (mgmtExchange != 0) { mgmtExchange->inc_msgRoutes (count); mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); @@ -122,8 +106,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { - std::vector<Binding::shared_ptr>::iterator j; - + Mutex::ScopedLock l(lock); if (routingKey) { Bindings::iterator i = bindings.find(*routingKey); @@ -131,17 +114,17 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin return false; if (!queue) return true; - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + + Queues::ConstPtr p = i->second.snapshot(); + return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); } else if (!queue) { //if no queue or routing key is specified, just report whether any bindings exist return bindings.size() > 0; } else { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) - for (j = i->second.begin(); j != i->second.end(); j++) - if ((*j)->queue == queue) - return true; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + Queues::ConstPtr p = i->second.snapshot(); + if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; + } return false; } diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 118f2ed4d3..2516ce4a13 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -25,16 +25,17 @@ #include <vector> #include "Exchange.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/CopyOnWriteArray.h" +#include "qpid/sys/Mutex.h" #include "Queue.h" namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ - typedef std::vector<Binding::shared_ptr> Queues; + typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues; typedef std::map<string, Queues> Bindings; Bindings bindings; - qpid::sys::RWlock lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 6416e2fc73..b40d6f9ed9 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -153,3 +153,11 @@ Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&) { return Manageable::STATUS_UNKNOWN_METHOD; } + + +Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} + +bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue; +} diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index f4ac4373e4..2e3b5ba13a 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -62,6 +62,12 @@ namespace qpid { management::ManagementObject* GetManagementObject () const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); }; + struct MatchQueue + { + const Queue::shared_ptr queue; + MatchQueue(Queue::shared_ptr q); + bool operator()(Exchange::Binding::shared_ptr b); + }; management::Exchange* mgmtExchange; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 373e9ab1cc..019c943ca1 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -40,18 +40,10 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedWlock locker(lock); - std::vector<Binding::shared_ptr>::iterator i; - - // Add if not already present. - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i == bindings.end()) { - Binding::shared_ptr binding (new Binding ("", queue, this)); - bindings.push_back(binding); +bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) +{ + Binding::shared_ptr binding (new Binding ("", queue, this)); + if (bindings.add_unless(binding, MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); @@ -62,16 +54,9 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, } } -bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedWlock locker(lock); - std::vector<Binding::shared_ptr>::iterator i; - - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - if (i != bindings.end()) { - bindings.erase(i); +bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/) +{ + if (bindings.remove_if(MatchQueue(queue))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); @@ -83,10 +68,10 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ - RWlock::ScopedRlock locker(lock); uint32_t count(0); - for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){ + BindingsArray::ConstPtr p = bindings.snapshot(); + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ msg.deliverTo((*i)->queue); if ((*i)->mgmtBinding != 0) (*i)->mgmtBinding->inc_msgMatched (); @@ -111,13 +96,8 @@ void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { - std::vector<Binding::shared_ptr>::iterator i; - - for (i = bindings.begin (); i != bindings.end(); i++) - if ((*i)->queue == queue) - break; - - return i != bindings.end(); + BindingsArray::ConstPtr ptr = bindings.snapshot(); + return ptr && std::find_if(ptr->begin(), ptr->end(), MatchQueue(queue)) != ptr->end(); } diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 4bc92f6b28..cfe9875024 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -25,16 +25,15 @@ #include <vector> #include "Exchange.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/CopyOnWriteArray.h" #include "Queue.h" namespace qpid { namespace broker { class FanOutExchange : public virtual Exchange { - std::vector<Binding::shared_ptr> bindings; - qpid::sys::RWlock lock; - + typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray; + BindingsArray bindings; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 54519a7bf6..f7842239da 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -73,22 +73,12 @@ std::string HeadersExchange::getMatch(const FieldTable* args) } bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){ - RWlock::ScopedWlock locker(lock); std::string what = getMatch(args); if (what != all && what != any) throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); - Bindings::iterator i; - - for (i = bindings.begin(); i != bindings.end(); i++) - if (i->first == *args && i->second->queue == queue) - break; - - if (i == bindings.end()) { - Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); - HeaderMap headerMap(*args, binding); - - bindings.push_back(headerMap); + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); + if (bindings.add_unless(binding, MatchArgs(queue, args))) { if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); @@ -99,21 +89,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co } } -bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){ - RWlock::ScopedWlock locker(lock); - Bindings::iterator i; - for (i = bindings.begin(); i != bindings.end(); i++) { - if (bindingKey.empty() && args) { - if (i->first == *args && i->second->queue == queue) - break; - } else { - if (i->second->key == bindingKey && i->second->queue == queue) - break; - } - } - - if (i != bindings.end()) { - bindings.erase(i); +bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){ + if (bindings.remove_if(MatchKey(queue, bindingKey))) { if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); @@ -128,13 +105,13 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ if (!args) return;//can't match if there were no headers passed in - RWlock::ScopedRlock locker(lock); uint32_t count(0); - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) { - if (match(i->first, *args)) msg.deliverTo(i->second->queue); - if (i->second->mgmtBinding != 0) - i->second->mgmtBinding->inc_msgMatched (); + Bindings::ConstPtr p = bindings.snapshot(); + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) { + if (match((*i)->args, *args)) msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); } if (mgmtExchange != 0) @@ -157,8 +134,9 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) { - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) { + Bindings::ConstPtr p = bindings.snapshot(); + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { + if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) { return true; } } @@ -227,5 +205,15 @@ bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) { return true; } +HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {} +bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue && b->args == *args; +} +HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {} +bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue && b->key == key; +} diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 6e101e193a..e10fab2250 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -24,7 +24,8 @@ #include <vector> #include "Exchange.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/CopyOnWriteArray.h" +#include "qpid/sys/Mutex.h" #include "Queue.h" namespace qpid { @@ -33,10 +34,25 @@ namespace broker { class HeadersExchange : public virtual Exchange { typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap; - typedef std::vector<HeaderMap> Bindings; + typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Bindings; + + struct MatchArgs + { + const Queue::shared_ptr queue; + const qpid::framing::FieldTable* args; + MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a); + bool operator()(Exchange::Binding::shared_ptr b); + }; + struct MatchKey + { + const Queue::shared_ptr queue; + const std::string& key; + MatchKey(Queue::shared_ptr q, const std::string& k); + bool operator()(Exchange::Binding::shared_ptr b); + }; Bindings bindings; - qpid::sys::RWlock lock; + qpid::sys::Mutex lock; static std::string getMatch(const framing::FieldTable* args); |
