diff options
Diffstat (limited to 'cpp/src/qpid/broker/HeadersExchange.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 71 |
1 files changed, 59 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index dd688cdfcf..c0f6cf19d2 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -40,19 +40,40 @@ namespace { const std::string x_match("x-match"); } -HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } -HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} +HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) : + Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ RWlock::ScopedWlock locker(lock); FieldTable::ValuePtr what = args->get(x_match); if (!what || (*what != all && *what != any)) throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); - Binding binding(*args, queue); - Bindings::iterator i = - std::find(bindings.begin(),bindings.end(), binding); + Bindings::iterator i; + + for (i = bindings.begin(); i != bindings.end(); i++) + if (i->first == *args && i->second->queue == queue) + break; + if (i == bindings.end()) { - bindings.push_back(binding); + Binding::shared_ptr binding (new Binding ("", queue, this)); + HeaderMap headerMap(*args, binding); + + bindings.push_back(headerMap); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } return true; } else { return false; @@ -61,10 +82,16 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ RWlock::ScopedWlock locker(lock); - Bindings::iterator i = - std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); + Bindings::iterator i; + for (i = bindings.begin(); i != bindings.end(); i++) + if (i->first == *args && i->second->queue == queue) + break; + if (i != bindings.end()) { bindings.erase(i); + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } return true; } else { return false; @@ -73,9 +100,29 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ - RWlock::ScopedRlock locker(lock);; - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *args)) msg.deliverTo(i->second); + RWlock::ScopedRlock locker(lock); + uint32_t count(0); + + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) { + if (match(i->first, *args)) msg.deliverTo(i->second->queue); + if (i->second->mgmtBinding.get() != 0) + i->second->mgmtBinding->inc_msgMatched (); + } + + if (mgmtExchange.get() != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } } } @@ -83,7 +130,7 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) { for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if ( (!args || equal(i->first, *args)) && (!queue || i->second == queue)) { + if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) { return true; } } |
