diff options
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 556 |
1 files changed, 437 insertions, 119 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 3f70f17ea4..6bc42e28bf 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -38,12 +38,82 @@ namespace _qmf = qmf::org::apache::qpid::broker; // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. -namespace { +namespace +{ +const std::string STAR("*"); +const std::string HASH("#"); +} + +// iterator for federation ReOrigin bind operation +class TopicExchange::ReOriginIter : public TopicExchange::BindingNode::TreeIterator { +public: + ReOriginIter() {}; + ~ReOriginIter() {}; + bool visit(BindingNode& node) { + if (node.bindings.fedBinding.hasLocal()) { + keys2prop.push_back(node.routePattern); + } + return true; + } + std::vector<std::string> keys2prop; +}; + + +// match iterator used by route(): builds BindingList of all unique queues +// that match the routing key. +class TopicExchange::BindingsFinderIter : public TopicExchange::BindingNode::TreeIterator { +public: + BindingsFinderIter(BindingList &bl) : b(bl) {}; + ~BindingsFinderIter() {}; + + BindingList& b; + std::set<std::string> qSet; + + bool visit(BindingNode& node) { + + Binding::vector& qv(node.bindings.bindingVector); + for (Binding::vector::iterator j = qv.begin(); j != qv.end(); j++) { + // do not duplicate queues on the binding list + if (qSet.insert(j->get()->queue->getName()).second) { + b->push_back(*j); + } + } + return true; + } +}; + + + +// Iterator to visit all bindings until a given queue is found +class TopicExchange::QueueFinderIter : public TopicExchange::BindingNode::TreeIterator { +public: + QueueFinderIter(Queue::shared_ptr queue) : queue(queue), found(false) {}; + ~QueueFinderIter() {}; + bool visit(BindingNode& node) { + + Binding::vector& qv(node.bindings.bindingVector); + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) { + if ((*q)->queue == queue) { + found = true; + return false; // search done + } + } + return true; // continue search + } + + Queue::shared_ptr queue; + bool found; +}; + + // Iterate over a string of '.'-separated tokens. -struct TokenIterator { +struct TopicExchange::TokenIterator { typedef pair<const char*,const char*> Token; - TokenIterator(const char* b, const char* e) : token(make_pair(b, find(b,e,'.'))), end(e) {} + TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {} + + TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {} bool finished() const { return !token.first; } @@ -56,23 +126,39 @@ struct TokenIterator { } } + void pop(string &top) { + ptrdiff_t l = len(); + if (l) { + top.assign(token.first, l); + } else top.clear(); + next(); + } + bool match1(char c) const { return token.second==token.first+1 && *token.first == c; } - bool match(const Token& token2) { + bool match(const Token& token2) const { ptrdiff_t l=len(); return l == token2.second-token2.first && strncmp(token.first, token2.first, l) == 0; } + bool match(const string& str) const { + ptrdiff_t l=len(); + return l == ptrdiff_t(str.size()) && + str.compare(0, l, token.first, l) == 0; + } + ptrdiff_t len() const { return token.second - token.first; } - Token token; + const char* end; + Token token; }; -class Normalizer : public TokenIterator { + +class TopicExchange::Normalizer : public TopicExchange::TokenIterator { public: Normalizer(string& p) : TokenIterator(&p[0], &p[0]+p.size()), pattern(p) @@ -106,54 +192,7 @@ class Normalizer : public TokenIterator { string& pattern; }; -class Matcher { - public: - Matcher(const string& p, const string& k) - : matched(), pattern(&p[0], &p[0]+p.size()), key(&k[0], &k[0]+k.size()) - { matched = match(); } - - operator bool() const { return matched; } - - private: - Matcher(const char* bp, const char* ep, const char* bk, const char* ek) - : matched(), pattern(bp,ep), key(bk,ek) { matched = match(); } - - bool match() { - // Invariant: pattern and key match up to but excluding - // pattern.token and key.token - while (!pattern.finished() && !key.finished()) { - if (pattern.match1('*') && !key.finished()) { - pattern.next(); - key.next(); - } - else if (pattern.match1('#')) { - pattern.next(); - if (pattern.finished()) return true; // Trailing # matches anything. - while (!key.finished()) { - if (Matcher(pattern.token.first, pattern.end, - key.token.first, key.end)) - return true; - key.next(); - } - return false; - } - else if (pattern.len() == key.len() && - equal(pattern.token.first,pattern.token.second,key.token.first)) { - pattern.next(); - key.next(); - } - else - return false; - } - if (!pattern.finished() && pattern.match1('#')) - pattern.next(); // Trailing # matches empty. - return pattern.finished() && key.finished(); - } - bool matched; - TokenIterator pattern, key; -}; -} // Convert sequences of * and # to a sequence of * followed by a single # string TopicExchange::normalize(const string& pattern) { @@ -162,12 +201,10 @@ string TopicExchange::normalize(const string& pattern) { return normal; } -bool TopicExchange::match(const string& pattern, const string& key) -{ - return Matcher(pattern, key); -} -TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) +TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) + : Exchange(_name, _parent, b), + nBindings(0) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); @@ -175,7 +212,8 @@ TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange(_name, _durable, _args, _parent, b) + Exchange(_name, _durable, _args, _parent, b), + nBindings(0) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); @@ -187,22 +225,27 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons string fedTags(args ? args->getAsString(qpidFedTags) : ""); string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); bool propagate = false; - bool reallyUnbind; string routingPattern = normalize(routingKey); if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { RWlock::ScopedWlock l(lock); - if (isBound(queue, routingPattern)) { - // already bound, but may be from a different fedOrigin - BoundKey& bk = bindings[routingPattern]; - bk.fedBinding.addOrigin(fedOrigin); - return false; - } else { + BindingKey *bk = bindingTree.addBindingKey(routingPattern); + if (bk) { + Binding::vector& qv(bk->bindingVector); + Binding::vector::iterator q; + for (q = qv.begin(); q != qv.end(); q++) { + if ((*q)->queue == queue) { + // already bound, but may be from a different fedOrigin + bk->fedBinding.addOrigin(fedOrigin); + return false; + } + } + Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); binding->startManagement(); - BoundKey& bk = bindings[routingPattern]; - bk.bindingVector.push_back(binding); - propagate = bk.fedBinding.addOrigin(fedOrigin); + bk->bindingVector.push_back(binding); + nBindings++; + propagate = bk->fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } @@ -210,11 +253,14 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons << " (origin=" << fedOrigin << ")"); } } else if (fedOp == fedOpUnbind) { + bool reallyUnbind = false; { RWlock::ScopedWlock l(lock); - BoundKey& bk = bindings[routingPattern]; - propagate = bk.fedBinding.delOrigin(fedOrigin); - reallyUnbind = bk.fedBinding.count() == 0; + BindingKey* bk = bindingTree.getBindingKey(routingPattern); + if (bk) { + propagate = bk->fedBinding.delOrigin(fedOrigin); + reallyUnbind = bk->fedBinding.count() == 0; + } } if (reallyUnbind) unbind(queue, routingPattern, 0); @@ -223,20 +269,14 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons * while holding the lock. Then propagate once the lock is * released */ - std::vector<std::string> keys2prop; + ReOriginIter reOriginIter; { - RWlock::ScopedRlock l(lock); - for (BindingMap::iterator iter = bindings.begin(); - iter != bindings.end(); iter++) { - const BoundKey& bk = iter->second; - - if (bk.fedBinding.hasLocal()) { - keys2prop.push_back(iter->first); - } - } + RWlock::ScopedRlock l(lock); + bindingTree.iterateAll( reOriginIter ); } /* lock dropped */ - for (std::vector<std::string>::const_iterator key = keys2prop.begin(); - key != keys2prop.end(); key++) { + + for (std::vector<std::string>::const_iterator key = reOriginIter.keys2prop.begin(); + key != reOriginIter.keys2prop.end(); key++) { propagateFedOp( *key, string(), fedOpBind, string()); } } @@ -250,11 +290,9 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); string routingKey = normalize(constRoutingKey); - - BindingMap::iterator bi = bindings.find(routingKey); - if (bi == bindings.end()) return false; - BoundKey& bk = bi->second; - Binding::vector& qv(bk.bindingVector); + BindingKey* bk = bindingTree.getBindingKey(routingKey); + if (!bk) return false; + Binding::vector& qv(bk->bindingVector); bool propagate = false; Binding::vector::iterator q; @@ -263,8 +301,12 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe break; if(q == qv.end()) return false; qv.erase(q); - propagate = bk.fedBinding.delOrigin(); - if(qv.empty()) bindings.erase(bi); + assert(nBindings > 0); + nBindings--; + propagate = bk->fedBinding.delOrigin(); + if(qv.empty()) { + bindingTree.removeBindingKey(routingKey); + } if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); } @@ -277,9 +319,10 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) { - BindingMap::iterator bi = bindings.find(pattern); - if (bi == bindings.end()) return false; - Binding::vector& qv(bi->second.bindingVector); + // Note well: lock held by caller.... + BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern + if (!bk) return false; + Binding::vector& qv(bk->bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) @@ -289,22 +332,13 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) { + // Note: PERFORMANCE CRITICAL!!! BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); PreRoute pr(msg, this); - std::set<std::string> qSet; + BindingsFinderIter bindingsFinder(b); { RWlock::ScopedRlock l(lock); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, routingKey)) { - Binding::vector& qv(i->second.bindingVector); - for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ - // do not duplicate queues on the binding list - if (qSet.insert(j->get()->queue->getName()).second) { - b->push_back(*j); - } - } - } - } + bindingTree.iterateMatch(routingKey, bindingsFinder); } doRoute(msg, b); } @@ -316,27 +350,311 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing string key(normalize(*routingKey)); return isBound(queue, key); } else if (!routingKey && !queue) { - return bindings.size() > 0; + return nBindings > 0; } else if (routingKey) { - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *routingKey)) - return true; - } - } else { - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - Binding::vector& qv(i->second.bindingVector); - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) - if ((*q)->queue == queue) - return true; + if (bindingTree.getBindingKey(*routingKey)) { + return true; } + } else { + QueueFinderIter queueFinder(queue); + bindingTree.iterateAll( queueFinder ); + return queueFinder.found; } return false; - return queue && routingKey; } TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); +// +// class BindingNode +// + +TopicExchange::BindingNode::~BindingNode() +{ + childTokens.clear(); +} + + +// Add a binding pattern to the tree. Return a pointer to the binding key +// of the node that matches the binding pattern. +TopicExchange::BindingKey* +TopicExchange::BindingNode::addBindingKey(const std::string& normalizedRoute) +{ + TokenIterator bKey(normalizedRoute); + return addBindingKey(bKey, normalizedRoute); +} + + +// Return a pointer to the binding key of the leaf node that matches the binding pattern. +TopicExchange::BindingKey* +TopicExchange::BindingNode::getBindingKey(const std::string& normalizedRoute) +{ + TokenIterator bKey(normalizedRoute); + return getBindingKey(bKey); +} + + +// Delete the binding associated with the given route. +void TopicExchange::BindingNode::removeBindingKey(const std::string& normalizedRoute) +{ + TokenIterator bKey2(normalizedRoute); + removeBindingKey(bKey2, normalizedRoute); +} + +// visit each node in the tree. Note: all nodes are visited, +// even non-leaf nodes (i.e. nodes without any bindings) +bool TopicExchange::BindingNode::iterateAll(TopicExchange::BindingNode::TreeIterator& iter) +{ + if (!iter.visit(*this)) return false; + + if (starChild && !starChild->iterateAll(iter)) return false; + + if (hashChild && !hashChild->iterateAll(iter)) return false; + + for (ChildMap::iterator ptr = childTokens.begin(); + ptr != childTokens.end(); ptr++) { + + if (!ptr->second->iterateAll(iter)) return false; + } + + return true; +} + +// applies iter against only matching nodes until iter returns false +// Note Well: the iter may match against the same node more than once +// if # wildcards are present! +bool TopicExchange::BindingNode::iterateMatch(const std::string& routingKey, TreeIterator& iter) +{ + TopicExchange::TokenIterator rKey(routingKey); + return iterateMatch( rKey, iter ); +} + + +// recurse over binding using token iterator. +// Note well: bkey is modified! +TopicExchange::BindingKey* +TopicExchange::BindingNode::addBindingKey(TokenIterator &bKey, + const string& fullPattern) +{ + if (bKey.finished()) { + // this node's binding + if (routePattern.empty()) { + routePattern = fullPattern; + } else assert(routePattern == fullPattern); + + return &bindings; + + } else { + // pop the topmost token & recurse... + + if (bKey.match(STAR)) { + if (!starChild) { + starChild.reset(new StarNode()); + } + bKey.next(); + return starChild->addBindingKey(bKey, fullPattern); + + } else if (bKey.match(HASH)) { + if (!hashChild) { + hashChild.reset(new HashNode()); + } + bKey.next(); + return hashChild->addBindingKey(bKey, fullPattern); + + } else { + ChildMap::iterator ptr; + std::string next_token; + bKey.pop(next_token); + ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + return ptr->second->addBindingKey(bKey, fullPattern); + } else { + BindingNode::shared_ptr child(new BindingNode(next_token)); + childTokens[next_token] = child; + return child->addBindingKey(bKey, fullPattern); + } + } + } +} + + +// Remove a binding pattern from the tree. Return true if the current +// node becomes a leaf without any bindings (therefore can be deleted). +// Note Well: modifies parameter bKey's value! +bool +TopicExchange::BindingNode::removeBindingKey(TokenIterator &bKey, + const string& fullPattern) +{ + bool remove; + + if (!bKey.finished()) { + + if (bKey.match(STAR)) { + bKey.next(); + if (starChild) { + remove = starChild->removeBindingKey(bKey, fullPattern); + if (remove) { + starChild.reset(); + } + } + } else if (bKey.match(HASH)) { + bKey.next(); + if (hashChild) { + remove = hashChild->removeBindingKey(bKey, fullPattern); + if (remove) { + hashChild.reset(); + } + } + } else { + ChildMap::iterator ptr; + std::string next_token; + bKey.pop(next_token); + ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + remove = ptr->second->removeBindingKey(bKey, fullPattern); + if (remove) { + childTokens.erase(ptr); + } + } + } + } + + // no bindings and no children == parent can delete this node. + return getChildCount() == 0 && bindings.bindingVector.empty(); +} + + +// find the binding key that matches the given binding pattern. +// Note Well: modifies key parameter! +TopicExchange::BindingKey* +TopicExchange::BindingNode::getBindingKey(TokenIterator &key) +{ + if (key.finished()) { + return &bindings; + } + + string next_token; + + key.pop(next_token); + + if (next_token == STAR) { + if (starChild) + return starChild->getBindingKey(key); + } else if (next_token == HASH) { + if (hashChild) + return hashChild->getBindingKey(key); + } else { + ChildMap::iterator ptr; + ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + return ptr->second->getBindingKey(key); + } + } + + return 0; +} + + + +// iterate over all nodes that match the given key. Note well: the set of nodes +// that are visited includes matching non-leaf nodes. +// Note well: parameter key is modified! +bool TopicExchange::BindingNode::iterateMatch(TokenIterator& key, TreeIterator& iter) +{ + // invariant: key has matched all previous tokens up to this node. + if (key.finished()) { + // exact match this node: visit if bound + if (!bindings.bindingVector.empty()) + if (!iter.visit(*this)) return false; + } + + // check remaining key against children, even if empty. + return iterateMatchChildren(key, iter); +} + + +TopicExchange::StarNode::StarNode() + : BindingNode(STAR) {} + + +// See iterateMatch() above. +// Special case: this node must verify a token is available (match exactly one). +bool TopicExchange::StarNode::iterateMatch(TokenIterator& key, TreeIterator& iter) +{ + // must match one token: + if (key.finished()) + return true; // match failed, but continue iteration on siblings + + // pop the topmost token + key.next(); + + if (key.finished()) { + // exact match this node: visit if bound + if (!bindings.bindingVector.empty()) + if (!iter.visit(*this)) return false; + } + + return iterateMatchChildren(key, iter); +} + + +TopicExchange::HashNode::HashNode() + : BindingNode(HASH) {} + + +// See iterateMatch() above. +// Special case: can match zero or more tokens at the head of the key. +bool TopicExchange::HashNode::iterateMatch(TokenIterator& key, TreeIterator& iter) +{ + // consume each token and look for a match on the + // remaining key. + while (!key.finished()) { + if (!iterateMatchChildren(key, iter)) return false; + key.next(); + } + + if (!bindings.bindingVector.empty()) + return iter.visit(*this); + + return true; +} + + +// helper: iterate over current node's matching children +bool +TopicExchange::BindingNode::iterateMatchChildren(const TopicExchange::TokenIterator& key, + TopicExchange::BindingNode::TreeIterator& iter) +{ + // always try glob - it can match empty keys + if (hashChild) { + TokenIterator tmp(key); + if (!hashChild->iterateMatch(tmp, iter)) + return false; + } + + if (!key.finished()) { + + if (starChild) { + TokenIterator tmp(key); + if (!starChild->iterateMatch(tmp, iter)) + return false; + } + + if (!childTokens.empty()) { + TokenIterator newKey(key); + std::string next_token; + newKey.pop(next_token); + + ChildMap::iterator ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + return ptr->second->iterateMatch(newKey, iter); + } + } + } + + return true; +} + }} // namespace qpid::broker |
