From b171cc419ae5d2bc747ec2465ad1c76445f8bd37 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 11 Sep 2009 13:33:42 +0000 Subject: Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@813825 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/xml/XmlExchange.cpp | 63 +++++++++++++--------------------------- 1 file changed, 20 insertions(+), 43 deletions(-) (limited to 'cpp/src/qpid/xml/XmlExchange.cpp') diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index a41c8840ff..253b9ff8d0 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -69,7 +69,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, // #### TODO: The Binding should take the query text // #### only. Consider encapsulating the entire block, including // #### the if condition. - + bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) { @@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { binding->parse_message_content = true; break; - } + } } } @@ -129,11 +129,11 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons } return true; } else { - return false; + return false; } } -bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) { string msgContent; @@ -151,12 +151,12 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), + XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), msgContent.length(), "input" ); // This will parse the document using either Xerces or FastXDM, depending // on your XQilla configuration. FastXDM can be as much as 10x faster. - + Sequence seq(context->parseDocument(xml)); if(!seq.isEmpty() && seq.first()->isNode()) { @@ -206,49 +206,26 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT PreRoute pr(msg, this); try { XmlBinding::vector::ConstPtr p; - { + qpid::sys::CopyOnWriteArray::Ptr b(new std::vector >); + { RWlock::ScopedRlock l(lock); - p = bindingsMap[routingKey].snapshot(); - if (!p) return; - } - int count(0); + p = bindingsMap[routingKey].snapshot(); + if (!p.get()) return; + } for (std::vector::const_iterator i = p->begin(); i != p->end(); i++) { - if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { - msg.deliverTo((*i)->queue); - count++; - QPID_LOG(trace, "Delivered to queue" ); - - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { + b->push_back(*i); } - } - if (!count) { - QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - } + } + doRoute(msg, b); } catch (...) { QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } - - } -bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { RWlock::ScopedRlock l(lock); if (routingKey) { @@ -274,12 +251,12 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe } -XmlExchange::~XmlExchange() +XmlExchange::~XmlExchange() { bindingsMap.clear(); } const std::string XmlExchange::typeName("xml"); - + } } -- cgit v1.2.1