summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-09-08 11:13:38 +0000
committerGordon Sim <gsim@apache.org>2008-09-08 11:13:38 +0000
commit27ca6af6141f088f3abff585248393fd26823103 (patch)
tree0a2680232934eb5fc9490c484d71649049646662 /cpp/src/qpid/broker
parent028745dbc3c47bd6561310678f82f15bd45678d9 (diff)
downloadqpid-python-27ca6af6141f088f3abff585248393fd26823103.tar.gz
QPID-1264: initial fix for fanout, direct and headers exchanges (fix for remaining types to follow)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@693053 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp71
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h7
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp8
-rw-r--r--cpp/src/qpid/broker/Exchange.h6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp42
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h7
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp56
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h22
8 files changed, 100 insertions, 119 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 4aa68bee9c..bc6d7fe495 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -42,41 +42,22 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable,
}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i == queues.end()) {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingKey].push_back(binding);
+ Mutex::ScopedLock l(lock);
+ Binding::shared_ptr b(new Binding (routingKey, queue, this));
+ if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
return true;
- } else{
+ } else {
return false;
}
}
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i < queues.end()) {
- queues.erase(i);
- if (queues.empty()) {
- bindings.erase(routingKey);
- }
+ Mutex::ScopedLock l(lock);
+ if (bindings[routingKey].remove_if(MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -88,16 +69,20 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedRlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
+ Queues::ConstPtr p;
+ {
+ Mutex::ScopedLock l(lock);
+ p = bindings[routingKey].snapshot();
+ }
int count(0);
- for(i = queues.begin(); i != queues.end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
+ if (p) {
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+ }
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
@@ -105,8 +90,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie
mgmtExchange->inc_msgDrops ();
mgmtExchange->inc_byteDrops (msg.contentSize ());
}
- }
- else {
+ } else {
if (mgmtExchange != 0) {
mgmtExchange->inc_msgRoutes (count);
mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
@@ -122,8 +106,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
- std::vector<Binding::shared_ptr>::iterator j;
-
+ Mutex::ScopedLock l(lock);
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
@@ -131,17 +114,17 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin
return false;
if (!queue)
return true;
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+
+ Queues::ConstPtr p = i->second.snapshot();
+ return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ Queues::ConstPtr p = i->second.snapshot();
+ if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
+ }
return false;
}
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 118f2ed4d3..2516ce4a13 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -25,16 +25,17 @@
#include <vector>
#include "Exchange.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
+#include "qpid/sys/Mutex.h"
#include "Queue.h"
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- typedef std::vector<Binding::shared_ptr> Queues;
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
typedef std::map<string, Queues> Bindings;
Bindings bindings;
- qpid::sys::RWlock lock;
+ qpid::sys::Mutex lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 6416e2fc73..b40d6f9ed9 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -153,3 +153,11 @@ Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&)
{
return Manageable::STATUS_UNKNOWN_METHOD;
}
+
+
+Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {}
+
+bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue;
+}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index f4ac4373e4..2e3b5ba13a 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -62,6 +62,12 @@ namespace qpid {
management::ManagementObject* GetManagementObject () const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args);
};
+ struct MatchQueue
+ {
+ const Queue::shared_ptr queue;
+ MatchQueue(Queue::shared_ptr q);
+ bool operator()(Exchange::Binding::shared_ptr b);
+ };
management::Exchange* mgmtExchange;
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 373e9ab1cc..019c943ca1 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -40,18 +40,10 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- RWlock::ScopedWlock locker(lock);
- std::vector<Binding::shared_ptr>::iterator i;
-
- // Add if not already present.
- for (i = bindings.begin (); i != bindings.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i == bindings.end()) {
- Binding::shared_ptr binding (new Binding ("", queue, this));
- bindings.push_back(binding);
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+{
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ if (bindings.add_unless(binding, MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -62,16 +54,9 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/,
}
}
-bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- RWlock::ScopedWlock locker(lock);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = bindings.begin (); i != bindings.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i != bindings.end()) {
- bindings.erase(i);
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+{
+ if (bindings.remove_if(MatchQueue(queue))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -83,10 +68,10 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
- RWlock::ScopedRlock locker(lock);
uint32_t count(0);
- for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+ BindingsArray::ConstPtr p = bindings.snapshot();
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
msg.deliverTo((*i)->queue);
if ((*i)->mgmtBinding != 0)
(*i)->mgmtBinding->inc_msgMatched ();
@@ -111,13 +96,8 @@ void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = bindings.begin (); i != bindings.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- return i != bindings.end();
+ BindingsArray::ConstPtr ptr = bindings.snapshot();
+ return ptr && std::find_if(ptr->begin(), ptr->end(), MatchQueue(queue)) != ptr->end();
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 4bc92f6b28..cfe9875024 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -25,16 +25,15 @@
#include <vector>
#include "Exchange.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
#include "Queue.h"
namespace qpid {
namespace broker {
class FanOutExchange : public virtual Exchange {
- std::vector<Binding::shared_ptr> bindings;
- qpid::sys::RWlock lock;
-
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray;
+ BindingsArray bindings;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 54519a7bf6..f7842239da 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -73,22 +73,12 @@ std::string HeadersExchange::getMatch(const FieldTable* args)
}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
- RWlock::ScopedWlock locker(lock);
std::string what = getMatch(args);
if (what != all && what != any)
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
- Bindings::iterator i;
-
- for (i = bindings.begin(); i != bindings.end(); i++)
- if (i->first == *args && i->second->queue == queue)
- break;
-
- if (i == bindings.end()) {
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
- HeaderMap headerMap(*args, binding);
-
- bindings.push_back(headerMap);
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
+ if (bindings.add_unless(binding, MatchArgs(queue, args))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -99,21 +89,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
}
}
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
- RWlock::ScopedWlock locker(lock);
- Bindings::iterator i;
- for (i = bindings.begin(); i != bindings.end(); i++) {
- if (bindingKey.empty() && args) {
- if (i->first == *args && i->second->queue == queue)
- break;
- } else {
- if (i->second->key == bindingKey && i->second->queue == queue)
- break;
- }
- }
-
- if (i != bindings.end()) {
- bindings.erase(i);
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
+ if (bindings.remove_if(MatchKey(queue, bindingKey))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
@@ -128,13 +105,13 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
- RWlock::ScopedRlock locker(lock);
uint32_t count(0);
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
- if (match(i->first, *args)) msg.deliverTo(i->second->queue);
- if (i->second->mgmtBinding != 0)
- i->second->mgmtBinding->inc_msgMatched ();
+ Bindings::ConstPtr p = bindings.snapshot();
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
+ if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
}
if (mgmtExchange != 0)
@@ -157,8 +134,9 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
+ Bindings::ConstPtr p = bindings.snapshot();
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
return true;
}
}
@@ -227,5 +205,15 @@ bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) {
return true;
}
+HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {}
+bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue && b->args == *args;
+}
+HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {}
+bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b)
+{
+ return b->queue == queue && b->key == key;
+}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 6e101e193a..e10fab2250 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -24,7 +24,8 @@
#include <vector>
#include "Exchange.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/CopyOnWriteArray.h"
+#include "qpid/sys/Mutex.h"
#include "Queue.h"
namespace qpid {
@@ -33,10 +34,25 @@ namespace broker {
class HeadersExchange : public virtual Exchange {
typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap;
- typedef std::vector<HeaderMap> Bindings;
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Bindings;
+
+ struct MatchArgs
+ {
+ const Queue::shared_ptr queue;
+ const qpid::framing::FieldTable* args;
+ MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a);
+ bool operator()(Exchange::Binding::shared_ptr b);
+ };
+ struct MatchKey
+ {
+ const Queue::shared_ptr queue;
+ const std::string& key;
+ MatchKey(Queue::shared_ptr q, const std::string& k);
+ bool operator()(Exchange::Binding::shared_ptr b);
+ };
Bindings bindings;
- qpid::sys::RWlock lock;
+ qpid::sys::Mutex lock;
static std::string getMatch(const framing::FieldTable* args);