diff options
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 84 |
1 files changed, 58 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 1b0fe71bcf..644a3d628e 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -221,6 +221,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) { + ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); string fedTags(args ? args->getAsString(qpidFedTags) : ""); string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); @@ -249,21 +250,21 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } - QPID_LOG(debug, "Bound key [" << routingPattern << "] to queue " << queue->getName() - << " (origin=" << fedOrigin << ")"); + QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName() + << " on exchange " << getName() << " (origin=" << fedOrigin << ")"); } } else if (fedOp == fedOpUnbind) { - bool reallyUnbind = false; - { - RWlock::ScopedWlock l(lock); - BindingKey* bk = bindingTree.getBindingKey(routingPattern); - if (bk) { - propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); - reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0; + RWlock::ScopedWlock l(lock); + BindingKey* bk = getQueueBinding(queue, routingPattern); + if (bk) { + QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName() + << " on queue=" << queue->getName() << " origin=" << fedOrigin); + propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); + // if this was the last binding for the queue, delete the binding + if (bk->fedBinding.countFedBindings(queue->getName()) == 0) { + deleteBinding(queue, routingPattern, bk); } } - if (reallyUnbind) - unbind(queue, routingPattern, 0); } else if (fedOp == fedOpReorigin) { /** gather up all the keys that need rebinding in a local vector * while holding the lock. Then propagate once the lock is @@ -281,20 +282,38 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } } + cc.clearCache(); // clear the cache before we IVE route. routeIVE(); if (propagate) propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); return true; } -bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){ +bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args) +{ + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName() + << " on exchange " << getName() << " origin=" << fedOrigin << ")" ); + + ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. RWlock::ScopedWlock l(lock); string routingKey = normalize(constRoutingKey); - BindingKey* bk = bindingTree.getBindingKey(routingKey); + BindingKey* bk = getQueueBinding(queue, routingKey); if (!bk) return false; - Binding::vector& qv(bk->bindingVector); - bool propagate = false; + bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); + deleteBinding(queue, routingKey, bk); + if (propagate) + propagateFedOp(routingKey, string(), fedOpUnbind, string()); + return true; +} + +bool TopicExchange::deleteBinding(Queue::shared_ptr queue, + const std::string& routingKey, + BindingKey *bk) +{ + // Note well: write lock held by caller + Binding::vector& qv(bk->bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) @@ -303,42 +322,55 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe qv.erase(q); assert(nBindings > 0); nBindings--; - propagate = bk->fedBinding.delOrigin(); + if(qv.empty()) { bindingTree.removeBindingKey(routingKey); } if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); } - QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName()); - - if (propagate) - propagateFedOp(routingKey, string(), fedOpUnbind, string()); + QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName() + << " on exchange " << getName()); return true; } -bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) +/** returns a pointer to the BindingKey if the given queue is bound to this + * exchange using the routing pattern. 0 if queue binding does not exist. + */ +TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern) { // Note well: lock held by caller.... BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern - if (!bk) return false; + if (!bk) return 0; Binding::vector& qv(bk->bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) break; - return q != qv.end(); + return (q != qv.end()) ? bk : 0; } void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) { // Note: PERFORMANCE CRITICAL!!! - BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + BindingList b; + std::map<std::string, BindingList>::iterator it; + { // only lock the cache for read + RWlock::ScopedRlock cl(cacheLock); + it = bindingCache.find(routingKey); + if (it != bindingCache.end()) { + b = it->second; + } + } PreRoute pr(msg, this); - BindingsFinderIter bindingsFinder(b); + if (!b.get()) // no cache hit { RWlock::ScopedRlock l(lock); + b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + BindingsFinderIter bindingsFinder(b); bindingTree.iterateMatch(routingKey, bindingsFinder); + RWlock::ScopedWlock cwl(cacheLock); + bindingCache[routingKey] = b; // update cache } doRoute(msg, b); } @@ -348,7 +380,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing RWlock::ScopedRlock l(lock); if (routingKey && queue) { string key(normalize(*routingKey)); - return isBound(queue, key); + return getQueueBinding(queue, key) != 0; } else if (!routingKey && !queue) { return nBindings > 0; } else if (routingKey) { |