summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/DirectExchange.cpp
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
committerRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
commit248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch)
treed5d0959a70218946ff72e107a6c106e32479a398 /cpp/src/qpid/broker/DirectExchange.cpp
parent3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff)
downloadqpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/DirectExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp189
1 files changed, 110 insertions, 79 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 4aa68bee9c..094f59cdec 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -19,111 +19,142 @@
*
*/
#include "qpid/log/Statement.h"
-#include "DirectExchange.h"
+#include "qpid/broker/DirectExchange.h"
#include <iostream>
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::management::Manageable;
+namespace _qmf = qmf::org::apache::qpid::broker;
-DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+const std::string qpidExclusiveBinding("qpid.exclusive-binding");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
- mgmtExchange->set_type (typeName);
+ mgmtExchange->set_type(typeName);
}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable,
- const FieldTable& _args, Manageable* _parent) :
- Exchange(_name, _durable, _args, _parent)
+DirectExchange::DirectExchange(const string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent, Broker* b) :
+ Exchange(_name, _durable, _args, _parent, b)
{
if (mgmtExchange != 0)
- mgmtExchange->set_type (typeName);
+ mgmtExchange->set_type(typeName);
}
-bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
-
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
-
- if (i == queues.end()) {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingKey].push_back(binding);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
- }
- return true;
- } else{
- return false;
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+ string fedOp(fedOpBind);
+ string fedTags;
+ string fedOrigin;
+ bool exclusiveBinding = false;
+ if (args) {
+ fedOp = args->getAsString(qpidFedOp);
+ fedTags = args->getAsString(qpidFedTags);
+ fedOrigin = args->getAsString(qpidFedOrigin);
+ exclusiveBinding = args->get(qpidExclusiveBinding);
}
-}
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedWlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
+ bool propagate = false;
- for (i = queues.begin(); i != queues.end(); i++)
- if ((*i)->queue == queue)
- break;
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
+ Mutex::ScopedLock l(lock);
+ Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+ BoundKey& bk = bindings[routingKey];
+ if (exclusiveBinding) bk.queues.clear();
- if (i < queues.end()) {
- queues.erase(i);
- if (queues.empty()) {
- bindings.erase(routingKey);
+ if (bk.queues.add_unless(b, MatchQueue(queue))) {
+ propagate = bk.fedBinding.addOrigin(fedOrigin);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ }
+ } else {
+ return false;
}
- if (mgmtExchange != 0) {
- mgmtExchange->dec_bindingCount();
- ((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
+ } else if (fedOp == fedOpUnbind) {
+ Mutex::ScopedLock l(lock);
+ BoundKey& bk = bindings[routingKey];
+ propagate = bk.fedBinding.delOrigin(fedOrigin);
+ if (bk.fedBinding.count() == 0)
+ unbind(queue, routingKey, 0);
+ } else if (fedOp == fedOpReorigin) {
+ /** gather up all the keys that need rebinding in a local vector
+ * while holding the lock. Then propagate once the lock is
+ * released
+ */
+ std::vector<std::string> keys2prop;
+ {
+ Mutex::ScopedLock l(lock);
+ for (Bindings::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+ if (bk.fedBinding.hasLocal()) {
+ keys2prop.push_back(iter->first);
+ }
+ }
+ } /* lock dropped */
+ for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+ key != keys2prop.end(); key++) {
+ propagateFedOp( *key, string(), fedOpBind, string());
}
- return true;
- } else {
- return false;
}
+
+ routeIVE();
+ if (propagate)
+ propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
+ return true;
}
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- RWlock::ScopedRlock l(lock);
- std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Binding::shared_ptr>::iterator i;
- int count(0);
-
- for(i = queues.begin(); i != queues.end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
-
- if(!count){
- QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- }
- else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+{
+ bool propagate = false;
+
+ {
+ Mutex::ScopedLock l(lock);
+ BoundKey& bk = bindings[routingKey];
+ if (bk.queues.remove_if(MatchQueue(queue))) {
+ propagate = bk.fedBinding.delOrigin();
+ if (mgmtExchange != 0) {
+ mgmtExchange->dec_bindingCount();
+ }
+ } else {
+ return false;
}
}
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
+ return true;
+}
+
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
+ PreRoute pr(msg, this);
+ ConstBindingList b;
+ {
+ Mutex::ScopedLock l(lock);
+ b = bindings[routingKey].queues.snapshot();
}
+ doRoute(msg, b);
}
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
- std::vector<Binding::shared_ptr>::iterator j;
-
+ Mutex::ScopedLock l(lock);
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
@@ -131,17 +162,17 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin
return false;
if (!queue)
return true;
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+
+ Queues::ConstPtr p = i->second.queues.snapshot();
+ return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
- for (j = i->second.begin(); j != i->second.end(); j++)
- if ((*j)->queue == queue)
- return true;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ Queues::ConstPtr p = i->second.queues.snapshot();
+ if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
+ }
return false;
}