diff options
Diffstat (limited to 'cpp/src/qpid/broker/XmlExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/XmlExchange.cpp | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp new file mode 100644 index 0000000000..8577e9211c --- /dev/null +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" +#include "XmlExchange.h" + +#include "DeliverableMessage.h" + +#include "qpid/log/Statement.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/reply_exceptions.h" + +#include <xercesc/framework/MemBufInputSource.hpp> + +#include <xqilla/context/ItemFactory.hpp> +#include <xqilla/xqilla-simple.hpp> + +#include <iostream> +#include <sstream> + +using namespace qpid::framing; +using namespace qpid::sys; +using qpid::management::Manageable; + +namespace qpid { +namespace broker { + +XmlExchange::XmlExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +XmlExchange::XmlExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +/* + * Use the name of the query as the binding key. + * + * The first time a given name is used in a binding, the query body + * must be provided.After that, no query body should be present. + * + * To modify an installed query, the user must first unbind the + * existing query, then replace it by binding again with the same + * name. + * + */ + + // #### TODO: The Binding should take the query text + // #### only. Consider encapsulating the entire block, including + // #### the if condition. + + +bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) +{ + RWlock::ScopedWlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + + string queryText = bindingArguments->getString("xquery"); + + for (i = bindings.begin(); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + if (i == bindings.end()) { + + try { + Query query(xqilla.parse(X(queryText.c_str()))); + XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); + XmlBinding::vector bindings(1, binding); + bindingsMap[routingKey] = bindings; + } + catch (XQException& e) { + throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } + return true; + } else{ + return false; + } +} + +bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) +{ + RWlock::ScopedWlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + + for (i = bindings.begin(); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + if (i < bindings.end()) { + bindings.erase(i); + if (bindings.empty()) { + bindingsMap.erase(routingKey); + } + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } + return true; + } else { + return false; + } +} + +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args) +{ + // ### TODO: Need istream for frameset + // Hack alert - the following code does not work for really large messages + + string msgContent; + msg.getMessage().getFrames().getContent(msgContent); + + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + + try { + XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*)msgContent.c_str(), msgContent.length(), "input" ); + Sequence seq(context->parseDocument(xml)); + + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } + + if(!seq.isEmpty() && seq.first()->isNode()) { + context->setContextItem(seq.first()); + context->setContextPosition(1); + context->setContextSize(1); + } + Result result = query->execute(context.get()); + return result->getEffectiveBooleanValue(context.get(), 0); + } + catch (XQException& e) { + QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); + } + return 0; +} + +void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) +{ + RWlock::ScopedRlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + int count(0); + + for (i = bindings.begin(); i != bindings.end(); i++, count++) { + + if (matches((*i)->xquery, msg, args)) { + msg.deliverTo((*i)->queue); + + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + } + + if(!count){ + QPID_LOG(warning, "XMLExchange " << getName() << " could not route message with query " << routingKey); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } + else { + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } + +} + + +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +{ + XmlBinding::vector::iterator j; + + if (routingKey) { + XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); + + if (i == bindingsMap.end()) + return false; + if (!queue) + return true; + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; + } else if (!queue) { + //if no queue or routing key is specified, just report whether any bindings exist + return bindingsMap.size() > 0; + } else { + for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; + return false; + } + + return false; +} + + +XmlExchange::~XmlExchange() +{ + bindingsMap.clear(); +} + +const std::string XmlExchange::typeName("xml"); + +} +} |