summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/xml/XmlExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/xml/XmlExchange.cpp')
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp63
1 files changed, 20 insertions, 43 deletions
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index a41c8840ff..253b9ff8d0 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -69,7 +69,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
// #### TODO: The Binding should take the query text
// #### only. Consider encapsulating the entire block, including
// #### the if condition.
-
+
bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments)
{
@@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
binding->parse_message_content = true;
break;
- }
+ }
}
}
@@ -129,11 +129,11 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons
}
return true;
} else {
- return false;
+ return false;
}
}
-bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
+bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
{
string msgContent;
@@ -151,12 +151,12 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F
QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
- XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
+ XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
msgContent.length(), "input" );
// This will parse the document using either Xerces or FastXDM, depending
// on your XQilla configuration. FastXDM can be as much as 10x faster.
-
+
Sequence seq(context->parseDocument(xml));
if(!seq.isEmpty() && seq.first()->isNode()) {
@@ -206,49 +206,26 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
- {
+ qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ {
RWlock::ScopedRlock l(lock);
- p = bindingsMap[routingKey].snapshot();
- if (!p) return;
- }
- int count(0);
+ p = bindingsMap[routingKey].snapshot();
+ if (!p.get()) return;
+ }
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
- if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
- msg.deliverTo((*i)->queue);
- count++;
- QPID_LOG(trace, "Delivered to queue" );
-
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
+ b->push_back(*i);
}
- }
- if (!count) {
- QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- }
+ }
+ doRoute(msg, b);
} catch (...) {
QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
}
-
-
}
-bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
RWlock::ScopedRlock l(lock);
if (routingKey) {
@@ -274,12 +251,12 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe
}
-XmlExchange::~XmlExchange()
+XmlExchange::~XmlExchange()
{
bindingsMap.clear();
}
const std::string XmlExchange::typeName("xml");
-
+
}
}