diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 8 |
6 files changed, 57 insertions, 39 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index fc52ab3711..5b8104c77c 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -76,13 +76,13 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (bk.queues.add_unless(b, MatchQueue(queue))) { b->startManagement(); - propagate = bk.fedBinding.addOrigin(fedOrigin); + propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } } else { // queue already present - still need to track fedOrigin - bk.fedBinding.addOrigin(fedOrigin); + bk.fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } else if (fedOp == fedOpUnbind) { @@ -90,11 +90,12 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con BoundKey& bk = bindings[routingKey]; QPID_LOG(debug, "Bind - fedOpUnbind key [" << routingKey << "] queue " << queue->getName() - << " (origin=" << fedOrigin << ")"); + << " (origin=" << fedOrigin << ")" << " (count=" << bk.fedBinding.count() << ")"); - propagate = bk.fedBinding.delOrigin(fedOrigin); - if (bk.fedBinding.count() == 0) + propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin); + if (bk.fedBinding.countFedBindings(queue->getName()) == 0) unbind(queue, routingKey, 0); + } else if (fedOp == fedOpReorigin) { /** gather up all the keys that need rebinding in a local vector * while holding the lock. Then propagate once the lock is @@ -142,6 +143,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } } + // If I delete my local binding, propagate this unbind to any upstream brokers if (propagate) propagateFedOp(routingKey, string(), fedOpUnbind, string()); return true; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 26d7f41015..3c8b5ca2cd 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -95,46 +95,61 @@ protected: bool operator()(Exchange::Binding::shared_ptr b); }; + /** A FedBinding keeps track of information that Federation needs + to know when to propagate changes. + + Dynamic federation needs to know which exchanges have at least + one local binding. The bindings on these exchanges need to be + propagated. + + Federated binds and unbinds need to know which federation + origins are associated with the bindings for each queue. When + origins are added or deleted, the corresponding bindings need + to be propagated. + + fedBindings[queueName] contains the origins associated with + the given queue. + */ + class FedBinding { uint32_t localBindings; - std::set<std::string> originSet; + + typedef std::set<std::string> originSet; + std::map<std::string, originSet> fedBindings; + public: FedBinding() : localBindings(0) {} bool hasLocal() const { return localBindings != 0; } - /** - * Returns 'true' if and only if this is the first local - * binding. - * - * The first local binding may need to be propagated. - */ - bool addOrigin(const std::string& origin) { + /** Returns true if propagation is needed. */ + bool addOrigin(const std::string& queueName, const std::string& origin) { if (origin.empty()) { localBindings++; return localBindings == 1; } - originSet.insert(origin); + fedBindings[queueName].insert(origin); return true; } - bool delOrigin(const std::string& origin) { - originSet.erase(origin); + + /** Returns true if propagation is needed. */ + bool delOrigin(const std::string& queueName, const std::string& origin){ + fedBindings[queueName].erase(origin); return true; } - /** - * Returns 'true' if and only if the last local binding is - * deleted. - * - * When the last local binding is deleted, it may need to - * be propagated. - */ + /** Returns true if propagation is needed. */ bool delOrigin() { if (localBindings > 0) localBindings--; return localBindings == 0; } + uint32_t count() { - return localBindings + originSet.size(); + return localBindings + fedBindings.size(); + } + + uint32_t countFedBindings(const std::string& queueName) { + return fedBindings[queueName].size(); } }; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index a33eba1d09..ac2c914a97 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -53,18 +53,18 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); if (bindings.add_unless(binding, MatchQueue(queue))) { binding->startManagement(); - propagate = fedBinding.addOrigin(fedOrigin); + propagate = fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } } else { // queue already present - still need to track fedOrigin - fedBinding.addOrigin(fedOrigin); + fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } else if (fedOp == fedOpUnbind) { - propagate = fedBinding.delOrigin(fedOrigin); - if (fedBinding.count() == 0) + propagate = fedBinding.delOrigin(queue->getName(), fedOrigin); + if (fedBinding.countFedBindings(queue->getName()) == 0) unbind(queue, "", 0); } else if (fedOp == fedOpReorigin) { if (fedBinding.hasLocal()) { diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 584cd4c481..82ac5911ee 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -116,12 +116,12 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co BoundKey bk(binding); if (bindings.add_unless(bk, MatchArgs(queue, args))) { binding->startManagement(); - propagate = bk.fedBinding.addOrigin(fedOrigin); + propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } } else { - bk.fedBinding.addOrigin(fedOrigin); + bk.fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } // lock dropped @@ -129,7 +129,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co } else if (fedOp == fedOpUnbind) { Mutex::ScopedLock l(lock); - FedUnbindModifier modifier(fedOrigin); + FedUnbindModifier modifier(queue->getName(), fedOrigin); bindings.modify_if(MatchKey(queue, bindingKey), modifier); propagate = modifier.shouldPropagate; if (modifier.shouldUnbind) { @@ -325,7 +325,7 @@ bool HeadersExchange::MatchKey::operator()(BoundKey & bk) } //---------- -HeadersExchange::FedUnbindModifier::FedUnbindModifier(string & origin) : fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {} +HeadersExchange::FedUnbindModifier::FedUnbindModifier(const string& queueName, const string& origin) : queueName(queueName), fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {} HeadersExchange::FedUnbindModifier::FedUnbindModifier() : shouldUnbind(false), shouldPropagate(false) {} bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk) @@ -333,9 +333,9 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk) if ("" == fedOrigin) { shouldPropagate = bk.fedBinding.delOrigin(); } else { - shouldPropagate = bk.fedBinding.delOrigin(fedOrigin); + shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin); } - if (bk.fedBinding.count() == 0) + if (bk.fedBinding.countFedBindings(queueName) == 0) { shouldUnbind = true; } diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 33c119cbbb..3b939d6851 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -60,11 +60,12 @@ class HeadersExchange : public virtual Exchange { struct FedUnbindModifier { + std::string queueName; std::string fedOrigin; bool shouldUnbind; bool shouldPropagate; FedUnbindModifier(); - FedUnbindModifier(std::string & origin); + FedUnbindModifier(const std::string& queueName, const std::string& origin); bool operator()(BoundKey & bk); }; diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 6bc42e28bf..1b0fe71bcf 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -236,7 +236,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons for (q = qv.begin(); q != qv.end(); q++) { if ((*q)->queue == queue) { // already bound, but may be from a different fedOrigin - bk->fedBinding.addOrigin(fedOrigin); + bk->fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } @@ -245,7 +245,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons binding->startManagement(); bk->bindingVector.push_back(binding); nBindings++; - propagate = bk->fedBinding.addOrigin(fedOrigin); + propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } @@ -258,8 +258,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons RWlock::ScopedWlock l(lock); BindingKey* bk = bindingTree.getBindingKey(routingPattern); if (bk) { - propagate = bk->fedBinding.delOrigin(fedOrigin); - reallyUnbind = bk->fedBinding.count() == 0; + propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); + reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0; } } if (reallyUnbind) |
