diff options
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 692 |
1 files changed, 0 insertions, 692 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp deleted file mode 100644 index 644a3d628e..0000000000 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ /dev/null @@ -1,692 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/TopicExchange.h" -#include "qpid/broker/FedOps.h" -#include "qpid/log/Statement.h" -#include <algorithm> - - -namespace qpid { -namespace broker { - -using namespace qpid::framing; -using namespace qpid::sys; -using namespace std; -namespace _qmf = qmf::org::apache::qpid::broker; - - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// Areas for improvement: -// - excessive string copying: should be 0 copy, match from original buffer. -// - match/lookup: use descision tree or other more efficient structure. - -namespace -{ -const std::string STAR("*"); -const std::string HASH("#"); -} - -// iterator for federation ReOrigin bind operation -class TopicExchange::ReOriginIter : public TopicExchange::BindingNode::TreeIterator { -public: - ReOriginIter() {}; - ~ReOriginIter() {}; - bool visit(BindingNode& node) { - if (node.bindings.fedBinding.hasLocal()) { - keys2prop.push_back(node.routePattern); - } - return true; - } - std::vector<std::string> keys2prop; -}; - - -// match iterator used by route(): builds BindingList of all unique queues -// that match the routing key. -class TopicExchange::BindingsFinderIter : public TopicExchange::BindingNode::TreeIterator { -public: - BindingsFinderIter(BindingList &bl) : b(bl) {}; - ~BindingsFinderIter() {}; - - BindingList& b; - std::set<std::string> qSet; - - bool visit(BindingNode& node) { - - Binding::vector& qv(node.bindings.bindingVector); - for (Binding::vector::iterator j = qv.begin(); j != qv.end(); j++) { - // do not duplicate queues on the binding list - if (qSet.insert(j->get()->queue->getName()).second) { - b->push_back(*j); - } - } - return true; - } -}; - - - -// Iterator to visit all bindings until a given queue is found -class TopicExchange::QueueFinderIter : public TopicExchange::BindingNode::TreeIterator { -public: - QueueFinderIter(Queue::shared_ptr queue) : queue(queue), found(false) {}; - ~QueueFinderIter() {}; - bool visit(BindingNode& node) { - - Binding::vector& qv(node.bindings.bindingVector); - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) { - if ((*q)->queue == queue) { - found = true; - return false; // search done - } - } - return true; // continue search - } - - Queue::shared_ptr queue; - bool found; -}; - - -// Iterate over a string of '.'-separated tokens. -struct TopicExchange::TokenIterator { - typedef pair<const char*,const char*> Token; - - TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {} - - TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {} - - bool finished() const { return !token.first; } - - void next() { - if (token.second == end) - token.first = token.second = 0; - else { - token.first=token.second+1; - token.second=(find(token.first, end, '.')); - } - } - - void pop(string &top) { - ptrdiff_t l = len(); - if (l) { - top.assign(token.first, l); - } else top.clear(); - next(); - } - - bool match1(char c) const { - return token.second==token.first+1 && *token.first == c; - } - - bool match(const Token& token2) const { - ptrdiff_t l=len(); - return l == token2.second-token2.first && - strncmp(token.first, token2.first, l) == 0; - } - - bool match(const string& str) const { - ptrdiff_t l=len(); - return l == ptrdiff_t(str.size()) && - str.compare(0, l, token.first, l) == 0; - } - - ptrdiff_t len() const { return token.second - token.first; } - - - const char* end; - Token token; -}; - - -class TopicExchange::Normalizer : public TopicExchange::TokenIterator { - public: - Normalizer(string& p) - : TokenIterator(&p[0], &p[0]+p.size()), pattern(p) - { normalize(); } - - private: - // Apply 2 transformations: #.* -> *.# and #.# -> # - void normalize() { - while (!finished()) { - if (match1('#')) { - const char* hash1=token.first; - next(); - if (!finished()) { - if (match1('#')) { // Erase #.# -> # - pattern.erase(hash1-pattern.data(), 2); - token.first -= 2; - token.second -= 2; - end -= 2; - } - else if (match1('*')) { // Swap #.* -> *.# - swap(*const_cast<char*>(hash1), - *const_cast<char*>(token.first)); - } - } - } - else - next(); - } - } - - string& pattern; -}; - - - -// Convert sequences of * and # to a sequence of * followed by a single # -string TopicExchange::normalize(const string& pattern) { - string normal(pattern); - Normalizer n(normal); - return normal; -} - - -TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) - : Exchange(_name, _parent, b), - nBindings(0) -{ - if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); -} - -TopicExchange::TopicExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange(_name, _durable, _args, _parent, b), - nBindings(0) -{ - if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); -} - -bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) -{ - ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. - string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); - string fedTags(args ? args->getAsString(qpidFedTags) : ""); - string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); - bool propagate = false; - string routingPattern = normalize(routingKey); - - if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { - RWlock::ScopedWlock l(lock); - BindingKey *bk = bindingTree.addBindingKey(routingPattern); - if (bk) { - Binding::vector& qv(bk->bindingVector); - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) { - if ((*q)->queue == queue) { - // already bound, but may be from a different fedOrigin - bk->fedBinding.addOrigin(queue->getName(), fedOrigin); - return false; - } - } - - Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); - binding->startManagement(); - bk->bindingVector.push_back(binding); - nBindings++; - propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin); - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - } - QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName() - << " on exchange " << getName() << " (origin=" << fedOrigin << ")"); - } - } else if (fedOp == fedOpUnbind) { - RWlock::ScopedWlock l(lock); - BindingKey* bk = getQueueBinding(queue, routingPattern); - if (bk) { - QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName() - << " on queue=" << queue->getName() << " origin=" << fedOrigin); - propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); - // if this was the last binding for the queue, delete the binding - if (bk->fedBinding.countFedBindings(queue->getName()) == 0) { - deleteBinding(queue, routingPattern, bk); - } - } - } else if (fedOp == fedOpReorigin) { - /** gather up all the keys that need rebinding in a local vector - * while holding the lock. Then propagate once the lock is - * released - */ - ReOriginIter reOriginIter; - { - RWlock::ScopedRlock l(lock); - bindingTree.iterateAll( reOriginIter ); - } /* lock dropped */ - - for (std::vector<std::string>::const_iterator key = reOriginIter.keys2prop.begin(); - key != reOriginIter.keys2prop.end(); key++) { - propagateFedOp( *key, string(), fedOpBind, string()); - } - } - - cc.clearCache(); // clear the cache before we IVE route. - routeIVE(); - if (propagate) - propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); - return true; -} - -bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args) -{ - string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); - QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName() - << " on exchange " << getName() << " origin=" << fedOrigin << ")" ); - - ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. - RWlock::ScopedWlock l(lock); - string routingKey = normalize(constRoutingKey); - BindingKey* bk = getQueueBinding(queue, routingKey); - if (!bk) return false; - bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); - deleteBinding(queue, routingKey, bk); - if (propagate) - propagateFedOp(routingKey, string(), fedOpUnbind, string()); - return true; -} - - -bool TopicExchange::deleteBinding(Queue::shared_ptr queue, - const std::string& routingKey, - BindingKey *bk) -{ - // Note well: write lock held by caller - Binding::vector& qv(bk->bindingVector); - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) - if ((*q)->queue == queue) - break; - if(q == qv.end()) return false; - qv.erase(q); - assert(nBindings > 0); - nBindings--; - - if(qv.empty()) { - bindingTree.removeBindingKey(routingKey); - } - if (mgmtExchange != 0) { - mgmtExchange->dec_bindingCount(); - } - QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName() - << " on exchange " << getName()); - return true; -} - -/** returns a pointer to the BindingKey if the given queue is bound to this - * exchange using the routing pattern. 0 if queue binding does not exist. - */ -TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern) -{ - // Note well: lock held by caller.... - BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern - if (!bk) return 0; - Binding::vector& qv(bk->bindingVector); - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) - if ((*q)->queue == queue) - break; - return (q != qv.end()) ? bk : 0; -} - -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) -{ - // Note: PERFORMANCE CRITICAL!!! - BindingList b; - std::map<std::string, BindingList>::iterator it; - { // only lock the cache for read - RWlock::ScopedRlock cl(cacheLock); - it = bindingCache.find(routingKey); - if (it != bindingCache.end()) { - b = it->second; - } - } - PreRoute pr(msg, this); - if (!b.get()) // no cache hit - { - RWlock::ScopedRlock l(lock); - b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); - BindingsFinderIter bindingsFinder(b); - bindingTree.iterateMatch(routingKey, bindingsFinder); - RWlock::ScopedWlock cwl(cacheLock); - bindingCache[routingKey] = b; // update cache - } - doRoute(msg, b); -} - -bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) -{ - RWlock::ScopedRlock l(lock); - if (routingKey && queue) { - string key(normalize(*routingKey)); - return getQueueBinding(queue, key) != 0; - } else if (!routingKey && !queue) { - return nBindings > 0; - } else if (routingKey) { - if (bindingTree.getBindingKey(*routingKey)) { - return true; - } - } else { - QueueFinderIter queueFinder(queue); - bindingTree.iterateAll( queueFinder ); - return queueFinder.found; - } - return false; -} - -TopicExchange::~TopicExchange() {} - -const std::string TopicExchange::typeName("topic"); - -// -// class BindingNode -// - -TopicExchange::BindingNode::~BindingNode() -{ - childTokens.clear(); -} - - -// Add a binding pattern to the tree. Return a pointer to the binding key -// of the node that matches the binding pattern. -TopicExchange::BindingKey* -TopicExchange::BindingNode::addBindingKey(const std::string& normalizedRoute) -{ - TokenIterator bKey(normalizedRoute); - return addBindingKey(bKey, normalizedRoute); -} - - -// Return a pointer to the binding key of the leaf node that matches the binding pattern. -TopicExchange::BindingKey* -TopicExchange::BindingNode::getBindingKey(const std::string& normalizedRoute) -{ - TokenIterator bKey(normalizedRoute); - return getBindingKey(bKey); -} - - -// Delete the binding associated with the given route. -void TopicExchange::BindingNode::removeBindingKey(const std::string& normalizedRoute) -{ - TokenIterator bKey2(normalizedRoute); - removeBindingKey(bKey2, normalizedRoute); -} - -// visit each node in the tree. Note: all nodes are visited, -// even non-leaf nodes (i.e. nodes without any bindings) -bool TopicExchange::BindingNode::iterateAll(TopicExchange::BindingNode::TreeIterator& iter) -{ - if (!iter.visit(*this)) return false; - - if (starChild && !starChild->iterateAll(iter)) return false; - - if (hashChild && !hashChild->iterateAll(iter)) return false; - - for (ChildMap::iterator ptr = childTokens.begin(); - ptr != childTokens.end(); ptr++) { - - if (!ptr->second->iterateAll(iter)) return false; - } - - return true; -} - -// applies iter against only matching nodes until iter returns false -// Note Well: the iter may match against the same node more than once -// if # wildcards are present! -bool TopicExchange::BindingNode::iterateMatch(const std::string& routingKey, TreeIterator& iter) -{ - TopicExchange::TokenIterator rKey(routingKey); - return iterateMatch( rKey, iter ); -} - - -// recurse over binding using token iterator. -// Note well: bkey is modified! -TopicExchange::BindingKey* -TopicExchange::BindingNode::addBindingKey(TokenIterator &bKey, - const string& fullPattern) -{ - if (bKey.finished()) { - // this node's binding - if (routePattern.empty()) { - routePattern = fullPattern; - } else assert(routePattern == fullPattern); - - return &bindings; - - } else { - // pop the topmost token & recurse... - - if (bKey.match(STAR)) { - if (!starChild) { - starChild.reset(new StarNode()); - } - bKey.next(); - return starChild->addBindingKey(bKey, fullPattern); - - } else if (bKey.match(HASH)) { - if (!hashChild) { - hashChild.reset(new HashNode()); - } - bKey.next(); - return hashChild->addBindingKey(bKey, fullPattern); - - } else { - ChildMap::iterator ptr; - std::string next_token; - bKey.pop(next_token); - ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - return ptr->second->addBindingKey(bKey, fullPattern); - } else { - BindingNode::shared_ptr child(new BindingNode(next_token)); - childTokens[next_token] = child; - return child->addBindingKey(bKey, fullPattern); - } - } - } -} - - -// Remove a binding pattern from the tree. Return true if the current -// node becomes a leaf without any bindings (therefore can be deleted). -// Note Well: modifies parameter bKey's value! -bool -TopicExchange::BindingNode::removeBindingKey(TokenIterator &bKey, - const string& fullPattern) -{ - bool remove; - - if (!bKey.finished()) { - - if (bKey.match(STAR)) { - bKey.next(); - if (starChild) { - remove = starChild->removeBindingKey(bKey, fullPattern); - if (remove) { - starChild.reset(); - } - } - } else if (bKey.match(HASH)) { - bKey.next(); - if (hashChild) { - remove = hashChild->removeBindingKey(bKey, fullPattern); - if (remove) { - hashChild.reset(); - } - } - } else { - ChildMap::iterator ptr; - std::string next_token; - bKey.pop(next_token); - ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - remove = ptr->second->removeBindingKey(bKey, fullPattern); - if (remove) { - childTokens.erase(ptr); - } - } - } - } - - // no bindings and no children == parent can delete this node. - return getChildCount() == 0 && bindings.bindingVector.empty(); -} - - -// find the binding key that matches the given binding pattern. -// Note Well: modifies key parameter! -TopicExchange::BindingKey* -TopicExchange::BindingNode::getBindingKey(TokenIterator &key) -{ - if (key.finished()) { - return &bindings; - } - - string next_token; - - key.pop(next_token); - - if (next_token == STAR) { - if (starChild) - return starChild->getBindingKey(key); - } else if (next_token == HASH) { - if (hashChild) - return hashChild->getBindingKey(key); - } else { - ChildMap::iterator ptr; - ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - return ptr->second->getBindingKey(key); - } - } - - return 0; -} - - - -// iterate over all nodes that match the given key. Note well: the set of nodes -// that are visited includes matching non-leaf nodes. -// Note well: parameter key is modified! -bool TopicExchange::BindingNode::iterateMatch(TokenIterator& key, TreeIterator& iter) -{ - // invariant: key has matched all previous tokens up to this node. - if (key.finished()) { - // exact match this node: visit if bound - if (!bindings.bindingVector.empty()) - if (!iter.visit(*this)) return false; - } - - // check remaining key against children, even if empty. - return iterateMatchChildren(key, iter); -} - - -TopicExchange::StarNode::StarNode() - : BindingNode(STAR) {} - - -// See iterateMatch() above. -// Special case: this node must verify a token is available (match exactly one). -bool TopicExchange::StarNode::iterateMatch(TokenIterator& key, TreeIterator& iter) -{ - // must match one token: - if (key.finished()) - return true; // match failed, but continue iteration on siblings - - // pop the topmost token - key.next(); - - if (key.finished()) { - // exact match this node: visit if bound - if (!bindings.bindingVector.empty()) - if (!iter.visit(*this)) return false; - } - - return iterateMatchChildren(key, iter); -} - - -TopicExchange::HashNode::HashNode() - : BindingNode(HASH) {} - - -// See iterateMatch() above. -// Special case: can match zero or more tokens at the head of the key. -bool TopicExchange::HashNode::iterateMatch(TokenIterator& key, TreeIterator& iter) -{ - // consume each token and look for a match on the - // remaining key. - while (!key.finished()) { - if (!iterateMatchChildren(key, iter)) return false; - key.next(); - } - - if (!bindings.bindingVector.empty()) - return iter.visit(*this); - - return true; -} - - -// helper: iterate over current node's matching children -bool -TopicExchange::BindingNode::iterateMatchChildren(const TopicExchange::TokenIterator& key, - TopicExchange::BindingNode::TreeIterator& iter) -{ - // always try glob - it can match empty keys - if (hashChild) { - TokenIterator tmp(key); - if (!hashChild->iterateMatch(tmp, iter)) - return false; - } - - if (!key.finished()) { - - if (starChild) { - TokenIterator tmp(key); - if (!starChild->iterateMatch(tmp, iter)) - return false; - } - - if (!childTokens.empty()) { - TokenIterator newKey(key); - std::string next_token; - newKey.pop(next_token); - - ChildMap::iterator ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - return ptr->second->iterateMatch(newKey, iter); - } - } - } - - return true; -} - -}} // namespace qpid::broker |