From ca2c15b9121db502807221936bc146a4b5520234 Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Wed, 15 Oct 2008 17:05:31 +0000 Subject: QPID-1341 from Jonathan - Patch applied for Jonathan - Made the following changes - added PreRoute for route() for sequencing - changed xmlexchange form struct to class - added xml.so to verify script - removed two unsed files. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704962 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.h | 1 - cpp/src/qpid/broker/ExchangeRegistry.cpp | 8 - cpp/src/qpid/broker/XmlExchange.cpp | 257 ------------------------------ cpp/src/qpid/broker/XmlExchange.h | 88 ----------- cpp/src/qpid/xml/XmlBinding.h | 37 +++++ cpp/src/qpid/xml/XmlExchange.cpp | 261 +++++++++++++++++++++++++++++++ cpp/src/qpid/xml/XmlExchange.h | 88 +++++++++++ cpp/src/qpid/xml/XmlExchangePlugin.cpp | 67 ++++++++ 8 files changed, 453 insertions(+), 354 deletions(-) delete mode 100644 cpp/src/qpid/broker/XmlExchange.cpp delete mode 100644 cpp/src/qpid/broker/XmlExchange.h create mode 100644 cpp/src/qpid/xml/XmlBinding.h create mode 100644 cpp/src/qpid/xml/XmlExchange.cpp create mode 100644 cpp/src/qpid/xml/XmlExchange.h create mode 100644 cpp/src/qpid/xml/XmlExchangePlugin.cpp (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 97a4a36eca..a7496f1510 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -129,7 +129,6 @@ class Broker : public sys::Runnable, public Plugin::Target, std::vector knownBrokers; std::vector getKnownBrokersImpl(); - public: diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 309e88e8be..7abb2ad443 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -25,9 +25,6 @@ #include "FanOutExchange.h" #include "HeadersExchange.h" #include "TopicExchange.h" -#ifdef HAVE_XML -#include "XmlExchange.h" -#endif #include "qpid/management/ManagementExchange.h" #include "qpid/framing/reply_exceptions.h" @@ -61,11 +58,6 @@ pair ExchangeRegistry::declare(const string& name, c }else if (type == ManagementExchange::typeName) { exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent)); } -#ifdef HAVE_XML - else if (type == XmlExchange::typeName) { - exchange = Exchange::shared_ptr(new XmlExchange(name, durable, args, parent)); - } -#endif else{ FunctionMap::iterator i = factory.find(type); if (i == factory.end()) { diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp deleted file mode 100644 index 0ff5b2fdbf..0000000000 --- a/cpp/src/qpid/broker/XmlExchange.cpp +++ /dev/null @@ -1,257 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "config.h" -#include "XmlExchange.h" - -#include "DeliverableMessage.h" - -#include "qpid/log/Statement.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/FieldValue.h" -#include "qpid/framing/reply_exceptions.h" - -#include - -#include -#include - -#include -#include - -using namespace qpid::framing; -using namespace qpid::sys; -using qpid::management::Manageable; -namespace _qmf = qmf::org::apache::qpid::broker; - -namespace qpid { -namespace broker { - -XmlExchange::XmlExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) -{ - if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); -} - -XmlExchange::XmlExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) -{ - if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); -} - -/* - * Use the name of the query as the binding key. - * - * The first time a given name is used in a binding, the query body - * must be provided.After that, no query body should be present. - * - * To modify an installed query, the user must first unbind the - * existing query, then replace it by binding again with the same - * name. - * - */ - - // #### TODO: The Binding should take the query text - // #### only. Consider encapsulating the entire block, including - // #### the if condition. - - -bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) -{ - string queryText = bindingArguments->getAsString("xquery"); - - try { - RWlock::ScopedWlock l(lock); - - XmlBinding::vector& bindings(bindingsMap[routingKey]); - XmlBinding::vector::ConstPtr p = bindings.snapshot(); - if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { - Query query(xqilla.parse(X(queryText.c_str()))); - XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); - bindings.add(binding); - QPID_LOG(trace, "Bound successfully with query: " << queryText ); - - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); - } - return true; - } else { - return false; - } - } - catch (XQException& e) { - throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); - } - catch (...) { - throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); - } -} - -bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) -{ - RWlock::ScopedWlock l(lock); - if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) { - if (mgmtExchange != 0) { - mgmtExchange->dec_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); - } - return true; - } else { - return false; - } -} - -bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args) -{ - // ### TODO: Need istream for frameset - // Hack alert - the following code does not work for really large messages - - string msgContent; - - try { - msg.getMessage().getFrames().getContent(msgContent); - - QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); - QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - - boost::scoped_ptr context(query->createDynamicContext()); - if (!context.get()) { - throw InternalErrorException(QPID_MSG("Query context looks munged ...")); - } - - XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*) msgContent.c_str(), msgContent.length(), "input" ); - Sequence seq(context->parseDocument(xml)); - - if (args) { - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - // ### TODO: Do types properly - if (v->second->convertsTo()) { - QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - } - } - - if(!seq.isEmpty() && seq.first()->isNode()) { - context->setContextItem(seq.first()); - context->setContextPosition(1); - context->setContextSize(1); - } - Result result = query->execute(context.get()); - return result->getEffectiveBooleanValue(context.get(), 0); - } - catch (XQException& e) { - QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); - return 0; - } - catch (...) { - QPID_LOG(warning, "Unexpected error routing message: " << msgContent); - return 0; - } - return 0; -} - -void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) -{ - try { - XmlBinding::vector::ConstPtr p; - { - RWlock::ScopedRlock l(lock); - p = bindingsMap[routingKey].snapshot(); - if (!p) return; - } - int count(0); - - for (std::vector::const_iterator i = p->begin(); i != p->end(); i++) { - if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... - msg.deliverTo((*i)->queue); - count++; - QPID_LOG(trace, "Delivered to queue" ); - - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - } - if (!count) { - QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - } - } catch (...) { - QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); - } - - -} - - -bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) -{ - RWlock::ScopedRlock l(lock); - if (routingKey) { - XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); - - if (i == bindingsMap.end()) - return false; - if (!queue) - return true; - XmlBinding::vector::ConstPtr p = i->second.snapshot(); - return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); - } else if (!queue) { - //if no queue or routing key is specified, just report whether any bindings exist - return bindingsMap.size() > 0; - } else { - for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) { - XmlBinding::vector::ConstPtr p = i->second.snapshot(); - if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; - } - return false; - } - -} - - -XmlExchange::~XmlExchange() -{ - bindingsMap.clear(); -} - -const std::string XmlExchange::typeName("xml"); - -} -} diff --git a/cpp/src/qpid/broker/XmlExchange.h b/cpp/src/qpid/broker/XmlExchange.h deleted file mode 100644 index 57d6c26e0d..0000000000 --- a/cpp/src/qpid/broker/XmlExchange.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _XmlExchange_ -#define _XmlExchange_ - -#include "Exchange.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/sys/CopyOnWriteArray.h" -#include "qpid/sys/Monitor.h" -#include "Queue.h" - -#include - -#include - -#include -#include - -namespace qpid { -namespace broker { - -class XmlExchange : public virtual Exchange { - - typedef boost::shared_ptr Query; - - struct XmlBinding : public Exchange::Binding { - typedef boost::shared_ptr shared_ptr; - typedef qpid::sys::CopyOnWriteArray vector; - - boost::shared_ptr xquery; - - XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, Query query): - Binding(key, queue, parent), xquery(query) {} - }; - - - typedef std::map XmlBindingsMap; - - XmlBindingsMap bindingsMap; - XQilla xqilla; - qpid::sys::RWlock lock; - - bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args); - - public: - static const std::string typeName; - - XmlExchange(const std::string& name, management::Manageable* parent = 0); - XmlExchange(const string& _name, bool _durable, - const qpid::framing::FieldTable& _args, management::Manageable* parent = 0); - - virtual std::string getType() const { return typeName; } - - virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); - - virtual ~XmlExchange(); -}; - - -} -} - - -#endif diff --git a/cpp/src/qpid/xml/XmlBinding.h b/cpp/src/qpid/xml/XmlBinding.h new file mode 100644 index 0000000000..cc6b4dca5d --- /dev/null +++ b/cpp/src/qpid/xml/XmlBinding.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include + +#ifndef _XmlBinding_ +#define _XmlBinding_ + +namespace qpid { +namespace client { + +class XmlBinding : public framing::FieldTable { + public: + setQuery(string query) { setString("xquery", query); } +}; + +} +} +#endif diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp new file mode 100644 index 0000000000..53eb0f20b8 --- /dev/null +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -0,0 +1,261 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" + +#include "XmlExchange.h" + +#include "qpid/broker/DeliverableMessage.h" + +#include "qpid/log/Statement.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/reply_exceptions.h" + +#include "qpid/Plugin.h" + +#include +#include +#include + +#include +#include + +using namespace qpid::framing; +using namespace qpid::sys; +using qpid::management::Manageable; +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { + + +XmlExchange::XmlExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +{ + if (mgmtExchange != 0) + mgmtExchange->set_type (typeName); +} + +XmlExchange::XmlExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange != 0) + mgmtExchange->set_type (typeName); +} + +/* + * Use the name of the query as the binding key. + * + * The first time a given name is used in a binding, the query body + * must be provided.After that, no query body should be present. + * + * To modify an installed query, the user must first unbind the + * existing query, then replace it by binding again with the same + * name. + * + */ + + // #### TODO: The Binding should take the query text + // #### only. Consider encapsulating the entire block, including + // #### the if condition. + + +bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) +{ + string queryText = bindingArguments->getAsString("xquery"); + + try { + RWlock::ScopedWlock l(lock); + + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::ConstPtr p = bindings.snapshot(); + if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) { + Query query(xqilla.parse(X(queryText.c_str()))); + XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); + bindings.add(binding); + QPID_LOG(trace, "Bound successfully with query: " << queryText ); + + if (mgmtExchange != 0) { + mgmtExchange->inc_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); + } + return true; + } else { + return false; + } + } + catch (XQException& e) { + throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + } + catch (...) { + throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText)); + } +} + +bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) +{ + RWlock::ScopedWlock l(lock); + if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) { + if (mgmtExchange != 0) { + mgmtExchange->dec_bindingCount(); + ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); + } + return true; + } else { + return false; + } +} + +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args) +{ + // ### TODO: Need istream for frameset + // Hack alert - the following code does not work for really large messages + + string msgContent; + + try { + msg.getMessage().getFrames().getContent(msgContent); + + QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]"); + QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); + + boost::scoped_ptr context(query->createDynamicContext()); + if (!context.get()) { + throw InternalErrorException(QPID_MSG("Query context looks munged ...")); + } + + XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*) msgContent.c_str(), msgContent.length(), "input" ); + Sequence seq(context->parseDocument(xml)); + + if (args) { + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } + } + + if(!seq.isEmpty() && seq.first()->isNode()) { + context->setContextItem(seq.first()); + context->setContextPosition(1); + context->setContextSize(1); + } + Result result = query->execute(context.get()); + return result->getEffectiveBooleanValue(context.get(), 0); + } + catch (XQException& e) { + QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); + return 0; + } + catch (...) { + QPID_LOG(warning, "Unexpected error routing message: " << msgContent); + return 0; + } + return 0; +} + +void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) +{ + PreRoute pr(msg, this); + try { + XmlBinding::vector::ConstPtr p; + { + RWlock::ScopedRlock l(lock); + p = bindingsMap[routingKey].snapshot(); + if (!p) return; + } + int count(0); + + for (std::vector::const_iterator i = p->begin(); i != p->end(); i++) { + if ((*i)->xquery && matches((*i)->xquery, msg, args)) { // Overly defensive? There should always be a query ... + msg.deliverTo((*i)->queue); + count++; + QPID_LOG(trace, "Delivered to queue" ); + + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + } + if (!count) { + QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } else { + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } + } catch (...) { + QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); + } + + +} + + +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +{ + RWlock::ScopedRlock l(lock); + if (routingKey) { + XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); + + if (i == bindingsMap.end()) + return false; + if (!queue) + return true; + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end(); + } else if (!queue) { + //if no queue or routing key is specified, just report whether any bindings exist + return bindingsMap.size() > 0; + } else { + for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) { + XmlBinding::vector::ConstPtr p = i->second.snapshot(); + if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true; + } + return false; + } + +} + + +XmlExchange::~XmlExchange() +{ + bindingsMap.clear(); +} + +const std::string XmlExchange::typeName("xml"); + +} +} diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h new file mode 100644 index 0000000000..066a26489d --- /dev/null +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _XmlExchange_ +#define _XmlExchange_ + +#include "qpid/broker/Exchange.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/CopyOnWriteArray.h" +#include "qpid/sys/Monitor.h" +#include "qpid/broker/Queue.h" + +#include + +#include + +#include +#include + +namespace qpid { +namespace broker { + +class XmlExchange : public virtual Exchange { + + typedef boost::shared_ptr Query; + + struct XmlBinding : public Exchange::Binding { + typedef boost::shared_ptr shared_ptr; + typedef qpid::sys::CopyOnWriteArray vector; + + boost::shared_ptr xquery; + + XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, Query query): + Binding(key, queue, parent), xquery(query) {} + }; + + + typedef std::map XmlBindingsMap; + + XmlBindingsMap bindingsMap; + XQilla xqilla; + qpid::sys::RWlock lock; + + bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args); + + public: + static const std::string typeName; + + XmlExchange(const std::string& name, management::Manageable* parent = 0); + XmlExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args, management::Manageable* parent = 0); + + virtual std::string getType() const { return typeName; } + + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + + virtual ~XmlExchange(); +}; + + +} +} + + +#endif diff --git a/cpp/src/qpid/xml/XmlExchangePlugin.cpp b/cpp/src/qpid/xml/XmlExchangePlugin.cpp new file mode 100644 index 0000000000..97e221589d --- /dev/null +++ b/cpp/src/qpid/xml/XmlExchangePlugin.cpp @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "qpid/acl/Acl.h" +#include "qpid/broker/Broker.h" +#include "qpid/Plugin.h" +#include "qpid/shared_ptr.h" +#include "qpid/log/Statement.h" + +#include + +#include "XmlExchange.h" + +namespace qpid { +namespace broker { // ACL uses the acl namespace here - should I? + +using namespace std; + +Exchange::shared_ptr create(const std::string& name, bool durable, + const framing::FieldTable& args, + management::Manageable* parent) +{ + Exchange::shared_ptr e(new XmlExchange(name, durable, args, parent)); + return e; +} + + +class XmlExchangePlugin : public Plugin +{ +public: + void earlyInitialize(Plugin::Target& target); + void initialize(Plugin::Target& target); +}; + + +void XmlExchangePlugin::initialize(Plugin::Target& target) +{ + Broker* broker = dynamic_cast(&target); + if (broker) { + broker->getExchanges().registerType(XmlExchange::typeName, &create); + QPID_LOG(info, "Registered xml exchange"); + } +} + +void XmlExchangePlugin::earlyInitialize(Target&) {} + + +static XmlExchangePlugin matchingPlugin; + + +}} // namespace qpid::acl -- cgit v1.2.1