diff options
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 65 |
1 files changed, 41 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 4447790766..644a3d628e 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -250,21 +250,21 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } - QPID_LOG(debug, "Bound key [" << routingPattern << "] to queue " << queue->getName() - << " (origin=" << fedOrigin << ")"); + QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName() + << " on exchange " << getName() << " (origin=" << fedOrigin << ")"); } } else if (fedOp == fedOpUnbind) { - bool reallyUnbind = false; - { - RWlock::ScopedWlock l(lock); - BindingKey* bk = bindingTree.getBindingKey(routingPattern); - if (bk) { - propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); - reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0; + RWlock::ScopedWlock l(lock); + BindingKey* bk = getQueueBinding(queue, routingPattern); + if (bk) { + QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName() + << " on queue=" << queue->getName() << " origin=" << fedOrigin); + propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); + // if this was the last binding for the queue, delete the binding + if (bk->fedBinding.countFedBindings(queue->getName()) == 0) { + deleteBinding(queue, routingPattern, bk); } } - if (reallyUnbind) - unbind(queue, routingPattern, 0); } else if (fedOp == fedOpReorigin) { /** gather up all the keys that need rebinding in a local vector * while holding the lock. Then propagate once the lock is @@ -289,15 +289,31 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons return true; } -bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){ +bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args) +{ + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName() + << " on exchange " << getName() << " origin=" << fedOrigin << ")" ); + ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. RWlock::ScopedWlock l(lock); string routingKey = normalize(constRoutingKey); - BindingKey* bk = bindingTree.getBindingKey(routingKey); + BindingKey* bk = getQueueBinding(queue, routingKey); if (!bk) return false; - Binding::vector& qv(bk->bindingVector); - bool propagate = false; + bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); + deleteBinding(queue, routingKey, bk); + if (propagate) + propagateFedOp(routingKey, string(), fedOpUnbind, string()); + return true; +} + +bool TopicExchange::deleteBinding(Queue::shared_ptr queue, + const std::string& routingKey, + BindingKey *bk) +{ + // Note well: write lock held by caller + Binding::vector& qv(bk->bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) @@ -306,31 +322,32 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe qv.erase(q); assert(nBindings > 0); nBindings--; - propagate = bk->fedBinding.delOrigin(); + if(qv.empty()) { bindingTree.removeBindingKey(routingKey); } if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); } - QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName()); - - if (propagate) - propagateFedOp(routingKey, string(), fedOpUnbind, string()); + QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName() + << " on exchange " << getName()); return true; } -bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) +/** returns a pointer to the BindingKey if the given queue is bound to this + * exchange using the routing pattern. 0 if queue binding does not exist. + */ +TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern) { // Note well: lock held by caller.... BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern - if (!bk) return false; + if (!bk) return 0; Binding::vector& qv(bk->bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) break; - return q != qv.end(); + return (q != qv.end()) ? bk : 0; } void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) @@ -363,7 +380,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing RWlock::ScopedRlock l(lock); if (routingKey && queue) { string key(normalize(*routingKey)); - return isBound(queue, key); + return getQueueBinding(queue, key) != 0; } else if (!routingKey && !queue) { return nBindings > 0; } else if (routingKey) { |
