summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/HeadersExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/HeadersExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp71
1 files changed, 59 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index dd688cdfcf..c0f6cf19d2 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -40,19 +40,40 @@ namespace {
const std::string x_match("x-match");
}
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
FieldTable::ValuePtr what = args->get(x_match);
if (!what || (*what != all && *what != any))
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
- Binding binding(*args, queue);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), binding);
+ Bindings::iterator i;
+
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(binding);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ HeaderMap headerMap(*args, binding);
+
+ bindings.push_back(headerMap);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -61,10 +82,16 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+ Bindings::iterator i;
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -73,9 +100,29 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- RWlock::ScopedRlock locker(lock);;
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) msg.deliverTo(i->second);
+ RWlock::ScopedRlock locker(lock);
+ uint32_t count(0);
+
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
+ if (match(i->first, *args)) msg.deliverTo(i->second->queue);
+ if (i->second->mgmtBinding.get() != 0)
+ i->second->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
@@ -83,7 +130,7 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if ( (!args || equal(i->first, *args)) && (!queue || i->second == queue)) {
+ if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
return true;
}
}