diff options
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 221 |
1 files changed, 130 insertions, 91 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index a465c35790..85c7a6a28e 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -21,11 +21,16 @@ #include "TopicExchange.h" #include <algorithm> -using namespace qpid::broker; + +namespace qpid { +namespace broker { + using namespace qpid::framing; using namespace qpid::sys; +using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; + // TODO aconway 2006-09-20: More efficient matching algorithm. // Areas for improvement: // - excessive string copying: should be 0 copy, match from original buffer. @@ -43,98 +48,134 @@ const std::string fedOpReorigin("R"); const std::string fedOpHello("H"); } -Tokens& Tokens::operator=(const std::string& s) { - clear(); - if (s.empty()) return *this; - std::string::const_iterator i = s.begin(); - while (true) { - // Invariant: i is at the beginning of the next untokenized word. - std::string::const_iterator j = std::find(i, s.end(), '.'); - push_back(std::string(i, j)); - if (j == s.end()) return *this; - i = j + 1; + +namespace { +// Iterate over a string of '.'-separated tokens. +struct TokenIterator { + typedef pair<const char*,const char*> Token; + + TokenIterator(const char* b, const char* e) : token(make_pair(b, find(b,e,'.'))), end(e) {} + + bool finished() const { return !token.first; } + + void next() { + if (token.second == end) + token.first = token.second = 0; + else { + token.first=token.second+1; + token.second=(find(token.first, end, '.')); + } } - return *this; -} -TopicPattern& TopicPattern::operator=(const Tokens& tokens) { - Tokens::operator=(tokens); - normalize(); - return *this; -} + bool match1(char c) const { + return token.second==token.first+1 && *token.first == c; + } -void Tokens::key(string& keytext) const -{ - for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) { - if (iter != begin()) - keytext += "."; - keytext += *iter; + bool match(const Token& token2) { + ptrdiff_t l=len(); + return l == token2.second-token2.first && + strncmp(token.first, token2.first, l) == 0; } -} -namespace { -const std::string hashmark("#"); -const std::string star("*"); -} + ptrdiff_t len() const { return token.second - token.first; } + + Token token; + const char* end; +}; -void TopicPattern::normalize() { - std::string word; - Tokens::iterator i = begin(); - while (i != end()) { - if (*i == hashmark) { - ++i; - while (i != end()) { - // Invariant: *(i-1)==#, [begin()..i-1] is normalized. - if (*i == star) { // Move * before #. - std::swap(*i, *(i-1)); - ++i; - } else if (*i == hashmark) { - erase(i); // Remove extra # - } else { - break; +class Normalizer : public TokenIterator { + public: + Normalizer(string& p) + : TokenIterator(&p[0], &p[0]+p.size()), pattern(p) + { normalize(); } + + private: + // Apply 2 transformations: #.* -> *.# and #.# -> # + void normalize() { + while (!finished()) { + if (match1('#')) { + const char* hash1=token.first; + next(); + if (!finished()) { + if (match1('#')) { // Erase #.# -> # + pattern.erase(hash1-pattern.data(), 2); + token.first -= 2; + token.second -= 2; + end -= 2; + } + else if (match1('*')) { // Swap #.* -> *.# + swap(*const_cast<char*>(hash1), + *const_cast<char*>(token.first)); + } } } - } else { - i ++; + else + next(); } } -} + string& pattern; +}; -namespace { -// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string. -// Need StringRef class that operates on a string in place witout copy. -// Should be applied everywhere strings are extracted from frames. -// -bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) -{ - // Invariant: [pattern_begin..p) matches [target_begin..t) - Tokens::const_iterator p = pattern_begin; - Tokens::const_iterator t = target_begin; - while (p != pattern_end && t != target_end) - { - if (*p == star || *p == *t) { - ++p, ++t; - } else if (*p == hashmark) { - ++p; - if (do_match(p, pattern_end, t, target_end)) return true; - while (t != target_end) { - ++t; - if (do_match(p, pattern_end, t, target_end)) return true; +class Matcher { + public: + Matcher(const string& p, const string& k) + : matched(), pattern(&p[0], &p[0]+p.size()), key(&k[0], &k[0]+k.size()) + { matched = match(); } + + operator bool() const { return matched; } + + private: + Matcher(const char* bp, const char* ep, const char* bk, const char* ek) + : matched(), pattern(bp,ep), key(bk,ek) { matched = match(); } + + bool match() { + // Invariant: pattern and key match up to but excluding + // pattern.token and key.token + while (!pattern.finished() && !key.finished()) { + if (pattern.match1('*') && !key.finished()) { + pattern.next(); + key.next(); } - return false; - } else { - return false; + else if (pattern.match1('#')) { + pattern.next(); + if (pattern.finished()) return true; // Trailing # matches anything. + while (!key.finished()) { + if (Matcher(pattern.token.first, pattern.end, + key.token.first, key.end)) + return true; + key.next(); + } + return false; + } + else if (pattern.len() == key.len() && + equal(pattern.token.first,pattern.token.second,key.token.first)) { + pattern.next(); + key.next(); + } + else + return false; } + if (!pattern.finished() && pattern.match1('#')) + pattern.next(); // Trailing # matches empty. + return pattern.finished() && key.finished(); } - while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # - return t == target_end && p == pattern_end; + + bool matched; + TokenIterator pattern, key; +}; } + +// Convert sequences of * and # to a sequence of * followed by a single # +string TopicExchange::normalize(const string& pattern) { + string normal(pattern); + Normalizer n(normal); + return normal; } -bool TopicPattern::match(const Tokens& target) const +bool TopicExchange::match(const string& pattern, const string& key) { - return do_match(begin(), end(), target.begin(), target.end()); + return Matcher(pattern, key); } TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) @@ -158,14 +199,14 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); bool propagate = false; bool reallyUnbind; - TopicPattern routingPattern(routingKey); + string routingPattern = normalize(routingKey); if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { RWlock::ScopedWlock l(lock); if (isBound(queue, routingPattern)) { return false; } else { - Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); BoundKey& bk = bindings[routingPattern]; bk.bindingVector.push_back(binding); propagate = bk.fedBinding.addOrigin(fedOrigin); @@ -182,15 +223,13 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons reallyUnbind = bk.fedBinding.count() == 0; } if (reallyUnbind) - unbind(queue, routingKey, 0); + unbind(queue, routingPattern, 0); } else if (fedOp == fedOpReorigin) { - for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin(); + for (BindingMap::iterator iter = bindings.begin(); iter != bindings.end(); iter++) { const BoundKey& bk = iter->second; if (bk.fedBinding.hasLocal()) { - string propKey; - iter->first.key(propKey); - propagateFedOp(propKey, string(), fedOpBind, string()); + propagateFedOp(iter->first, string(), fedOpBind, string()); } } } @@ -201,9 +240,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons return true; } -bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ +bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); - BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); + string routingKey = normalize(constRoutingKey); + + BindingMap::iterator bi = bindings.find(routingKey); if (bi == bindings.end()) return false; BoundKey& bk = bi->second; Binding::vector& qv(bk.bindingVector); @@ -227,7 +268,7 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co return true; } -bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) +bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) { BindingMap::iterator bi = bindings.find(pattern); if (bi == bindings.end()) return false; @@ -246,10 +287,8 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel { RWlock::ScopedRlock l(lock); - Tokens tokens(routingKey); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (i->first.match(tokens)) { + if (match(i->first, routingKey)) { Binding::vector& qv(i->second.bindingVector); for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ mb.push_back(*j); @@ -284,16 +323,15 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { if (routingKey && queue) { - TopicPattern key(*routingKey); + string key(normalize(*routingKey)); return isBound(queue, key); } else if (!routingKey && !queue) { return bindings.size() > 0; } else if (routingKey) { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (i->first.match(*routingKey)) { + if (match(i->first, *routingKey)) return true; } - } } else { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { Binding::vector& qv(i->second.bindingVector); @@ -304,10 +342,11 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing } } return false; + return queue && routingKey; } TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); - +}} // namespace qpid::broker |