diff options
author | Alan Conway <aconway@apache.org> | 2008-05-06 17:52:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-06 17:52:03 +0000 |
commit | 8d8a9162f7ba5a99a7c8b8b57aae860ab3028078 (patch) | |
tree | aa17b9f56448a5280e13441a7b0473dd5faee283 /cpp | |
parent | e4a3049c22b36171d204a8f84ddbcbf5accf797f (diff) | |
download | qpid-python-8d8a9162f7ba5a99a7c8b8b57aae860ab3028078.tar.gz |
From https://issues.apache.org/jira/browse/QPID-879 contributed by Jonathan Robie.
XML exchange allowing messages to be routed base on XQuery expressions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@653854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/configure.ac | 34 | ||||
-rw-r--r-- | cpp/examples/Makefile.am | 9 | ||||
-rw-r--r-- | cpp/examples/examples/xml-exchange/Makefile | 7 | ||||
-rw-r--r-- | cpp/examples/examples/xml-exchange/README | 48 | ||||
-rw-r--r-- | cpp/examples/examples/xml-exchange/declare_queues.cpp | 76 | ||||
-rw-r--r-- | cpp/examples/examples/xml-exchange/listener.cpp | 90 | ||||
-rw-r--r-- | cpp/examples/examples/xml-exchange/xml_producer.cpp | 88 | ||||
-rw-r--r-- | cpp/qpidc.spec.in | 3 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Deliverable.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/XmlExchange.cpp | 249 | ||||
-rw-r--r-- | cpp/src/qpid/broker/XmlExchange.h | 87 | ||||
-rw-r--r-- | cpp/src/qpid/client/Exchange.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldTable.h | 5 | ||||
-rw-r--r-- | cpp/src/tests/.valgrind.supp | 8 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 8 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/XmlClientSessionTest.cpp | 157 |
20 files changed, 909 insertions, 12 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac index 597891b741..63513eea12 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -139,6 +139,7 @@ AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],, # Check for optional CPG requirement. LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais" + AC_ARG_WITH([cpg], [AS_HELP_STRING([--with-cpg], [Build with CPG support])], [case "${withval}" in @@ -163,7 +164,6 @@ if test x$with_CPG = xyes; then CPPFLAGS+=" -DCPG" fi - # Setup --with-sasl/--without-sasl as arguments to configure AC_ARG_WITH([sasl], [AS_HELP_STRING([--with-sasl], [Build with SASL authentication support])], @@ -194,6 +194,38 @@ AS_IF([test "x$WANT_SASL" != xno], [The SASL app name for the qpid Broker]) AC_DEFINE([HAVE_SASL], [1], [Enable if libsasl is present])])]) + +# Setup --with-xml/--without-xml as arguments to configure +use_xml=yes +want_xml=check +AC_ARG_WITH([xml], + [AS_HELP_STRING([--with-xml], [Build with XML Exchange])], + [want_xml=$withval]) + +case $want_xml in + yes|no|check) ;; + *) AC_MSG_ERROR([Bad value for --with-xml: $withval]) ;; +esac + +test $want_xml = no && use_xml=no + +# If the user doesn't say not to use XML, see if it's available. +if test $use_xml != no; then + # Then see if XQilla is available + AC_CHECK_LIB([xerces-c], [_init], , [use_xml=no]) + AC_CHECK_HEADER([xqilla/xqilla-simple.hpp], , [use_xml=no]) + AC_CHECK_LIB([xqilla], [canonicalCombiningClassTable], , [use_xml=no]) + + # If XQilla is not available, yet specifically requested, die. + test $use_xml:$want_xml = no:yes && + AC_MSG_ERROR([XML Exchange requested, but XQilla or Xerces-C not available]) + + # Else XQilla is available - use it to build + AC_DEFINE([HAVE_XML], [1], [Compile-in XML Exchange support.]) +fi + +AM_CONDITIONAL([HAVE_XML], [test $use_xml = yes]) + # Setup --with-rdma/--without-rdma as arguments to configure AC_ARG_WITH([rdma], [AS_HELP_STRING([--with-rdma], [Build with support for Remote DMA protocols])], diff --git a/cpp/examples/Makefile.am b/cpp/examples/Makefile.am index 6d1d122744..54c59810ed 100644 --- a/cpp/examples/Makefile.am +++ b/cpp/examples/Makefile.am @@ -15,7 +15,11 @@ nobase_pkgdata_DATA= \ examples/direct/Makefile \ examples/direct/direct_producer.cpp \ examples/direct/listener.cpp \ - examples/direct/declare_queues.cpp + examples/direct/declare_queues.cpp \ + examples/xml-exchange/Makefile \ + examples/xml-exchange/declare_queues.cpp \ + examples/xml-exchange/xml_producer.cpp \ + examples/xml-exchange/listener.cpp VERIFY_FILES= verify verify_all \ examples/request-response/verify \ @@ -60,9 +64,6 @@ all-local: test -d examples || cp -R $(srcdir)/examples . cd examples && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(CXXFLAGS) -I$(abs_top_srcdir)/src -I$(abs_top_srcdir)/src/gen -I$(abs_top_builddir)/src -I$(abs_top_builddir)/src/gen -L$(abs_top_builddir)/src/.libs -Wl,-rpath,$(abs_top_builddir)/src/.libs" all -# FIXME aconway 2008-03-25: Re-enable when python client has been fixed -# to find .spec via PYTHONPATH. -# # Verify the examples in the buid tree. check-local: all-local verify $(srcdir)/verify_all $(abs_top_srcdir)/.. diff --git a/cpp/examples/examples/xml-exchange/Makefile b/cpp/examples/examples/xml-exchange/Makefile new file mode 100644 index 0000000000..e598dd3be3 --- /dev/null +++ b/cpp/examples/examples/xml-exchange/Makefile @@ -0,0 +1,7 @@ +CXX=g++ +CXXFLAGS= +LDFLAGS=-lqpidclient +PROGRAMS=declare_queues xml_producer listener +all: $(PROGRAMS) +clean: + rm -f $(PROGRAMS) diff --git a/cpp/examples/examples/xml-exchange/README b/cpp/examples/examples/xml-exchange/README new file mode 100644 index 0000000000..26b9fac97a --- /dev/null +++ b/cpp/examples/examples/xml-exchange/README @@ -0,0 +1,48 @@ +This example shows how to program a simple application +using the XML Exchange. + +To run the example, execute the programs in the +following order: + +1 ./declare_queues +2 ./listener +3 ./message_producer (in a separate window) + +The XML Exchange must be explicitly declared. Bindings +are established using queries in XQuery. These queries +can reference message content, message application +properties (which are declared as external variables +in the XQuery), or both. + +Once this is done, message producers publish to the +exchange using the exchange name and a routing key, +just as for other exchange types. Message consumers +read from the queues to which messages are routed. +If a message does not have XML content, or is +missing message application properties needed by +the query, the query is not routed. + +Queries can use message application headers to +provide functionality similar to JMS selectors. +If a query does not use the content of a message, +the message content is not parsed, and need not +be XML. + +The XQuery processor, XQilla, does path-based +document projection, so once the portion of +a document needed to evaluate a query has +been read, it stops parsing the document. +Suppose a long document has a header section. +You can indicate in the query that only +one header section needs to be queried, +and there is no need to parse the entire +document to see if there are further header +sections, using a path like this: + +./message/header[1]/date + +If you used a path like this, all children +of the message element would be read to +see if there are further headers: + +./message/header/date diff --git a/cpp/examples/examples/xml-exchange/declare_queues.cpp b/cpp/examples/examples/xml-exchange/declare_queues.cpp new file mode 100644 index 0000000000..44e3de33f8 --- /dev/null +++ b/cpp/examples/examples/xml-exchange/declare_queues.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::string; + + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + Message msg; + try { + connection.open(host, port); + Session session = connection.newSession(ASYNC); + + + //--------- Main body of program -------------------------------------------- + + // Set up queues, bind them with queries. Note that the XML exchange + // is not in the AMQP specification, so it is called "xml", not "amq.xml". + // Note that the XML exchange is not predeclared in Qpid, it must + // be declared by the application. + + session.queueDeclare(arg::queue="message_queue"); + session.exchangeDeclare(arg::exchange="xml", arg::type="xml"); + + // Application message properties are mapped to external variables + // in the XQuery. An XML Exchange can query message properties much + // like JMS, query the XML content of the message, or both. + + FieldTable binding; + binding.setString("xquery", "declare variable $control external;" + "./message/id mod 2 = 1 or $control = 'end'"); + session.exchangeBind(arg::exchange="xml", arg::queue="message_queue", arg::bindingKey="query_name", arg::arguments=binding); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/cpp/examples/examples/xml-exchange/listener.cpp b/cpp/examples/examples/xml-exchange/listener.cpp new file mode 100644 index 0000000000..b5e057c4b1 --- /dev/null +++ b/cpp/examples/examples/xml-exchange/listener.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * listener.cpp: This program reads messages fro a queue on + * the broker using a message listener. + */ + +#include <qpid/client/Dispatcher.h> +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener{ + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); +}; + +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) +{} + +void Listener::received(Message& message) { + std::cout << "Message: " << message.getData() << std::endl; + if (message.getHeaders().getString("control") == "end") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + Message msg; + try { + connection.open(host, port); + Session session = connection.newSession(ASYNC); + + //--------- Main body of program -------------------------------------------- + + SubscriptionManager subscriptions(session); + // Create a listener and subscribe it to the queue named "message_queue" + Listener listener(subscriptions); + subscriptions.subscribe(listener, "message_queue"); + // Deliver messages until the subscription is cancelled + // by Listener::received() + subscriptions.run(); + + //--------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/examples/xml-exchange/xml_producer.cpp b/cpp/examples/examples/xml-exchange/xml_producer.cpp new file mode 100644 index 0000000000..9333e20438 --- /dev/null +++ b/cpp/examples/examples/xml-exchange/xml_producer.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + Message message; + try { + connection.open(host, port); + Session session = connection.newSession(ASYNC); + + //--------- Main body of program -------------------------------------------- + + // Publish some XML messages. Use the control property to + // indicate when we are finished. + // + // In the XML exchange, the routing key and the name of + // the query match. + + message.getDeliveryProperties().setRoutingKey("query_name"); + message.getHeaders().setString("control","continue"); + + // Now send some messages ... + + for (int i=0; i<10; i++) { + stringstream message_data; + message_data << "<message><id>" << i << "</id></message>"; + + std::cout << "Message data: " << message_data.str() << std::endl; + + message.setData(message_data.str()); + session.messageTransfer(arg::content=message, arg::destination="xml"); + } + + // And send a final message to indicate termination. + + message.getHeaders().setString("control","end"); + message.setData("<end>That's all, folks!</end>"); + session.messageTransfer(arg::content=message, arg::destination="xml"); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/qpidc.spec.in b/cpp/qpidc.spec.in index 3fc88b46bf..eb62b30d01 100644 --- a/cpp/qpidc.spec.in +++ b/cpp/qpidc.spec.in @@ -1,3 +1,4 @@ + # # Spec file for Qpid C++ packages: qpidc qpidc-devel, qpidd, qpidd-devel # svn revision: $Rev$ @@ -54,6 +55,8 @@ Summary: An AMQP message broker daemon Group: System Environment/Daemons Requires: %name = %version-%release Requires: openais +Requires: xqilla +Requires: xerces-c %description -n %{qpidd} A message broker daemon that receives stores and routes messages using diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index bbe4cb73d3..5c052b0fe3 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -46,7 +46,7 @@ DISTCLEANFILES+=qpid/framing/MaxMethodBodySize.h ## Compiler flags -AM_CXXFLAGS = $(WARNING_CFLAGS) +AM_CXXFLAGS = $(WARNING_CFLAGS) $(CFLAGS) AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) INCLUDES = -Igen -I$(srcdir)/gen @@ -221,8 +221,12 @@ libqpidcommon_la_SOURCES = \ qpid/ISList.h \ qpid/pointer_to_other.h -libqpidbroker_la_LIBADD = libqpidcommon.la \ - -luuid +libqpidbroker_la_LIBADD = libqpidcommon.la -luuid +if HAVE_XML +libqpidbroker_la_LIBADD += -lxerces-c -lxqilla +endif + + libqpidbroker_la_SOURCES = \ $(mgen_broker_cpp) \ qpid/amqp_0_10/Connection.h \ @@ -288,8 +292,13 @@ libqpidbroker_la_SOURCES = \ qpid/management/ManagementObject.cpp \ qpid/sys/TCPIOPlugin.cpp -libqpidclient_la_LIBADD = libqpidcommon.la \ - -luuid +if HAVE_XML +libqpidbroker_la_SOURCES += qpid/broker/XmlExchange.cpp +endif + + +libqpidclient_la_LIBADD = libqpidcommon.la -luuid + libqpidclient_la_SOURCES = \ $(rgen_client_srcs) \ qpid/client/Bounds.cpp \ @@ -528,6 +537,11 @@ nobase_include_HEADERS = \ qpid/sys/Time.h \ qpid/sys/TimeoutHandler.h +if HAVE_XML +nobase_include_HEADERS += qpid/broker/XmlExchange.h +endif + + # Force build of qpidd during dist phase so help2man will work. dist-hook: $(BUILT_SOURCES) $(MAKE) qpidd diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h index e46d2024bf..c40780c4ae 100644 --- a/cpp/src/qpid/broker/Deliverable.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -22,6 +22,7 @@ #define _Deliverable_ #include "Queue.h" +#include "Message.h" namespace qpid { namespace broker { @@ -29,6 +30,9 @@ namespace qpid { public: bool delivered; Deliverable() : delivered(false) {} + + virtual Message& getMessage() = 0; + virtual void deliverTo(Queue::shared_ptr& queue) = 0; virtual uint64_t contentSize() { return 0; } virtual ~Deliverable(){} diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 58d9d5efb8..c1eb5ff5a3 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -18,11 +18,16 @@ * under the License. * */ + +#include "config.h" #include "ExchangeRegistry.h" #include "DirectExchange.h" #include "FanOutExchange.h" #include "HeadersExchange.h" #include "TopicExchange.h" +#ifdef HAVE_XML +#include "XmlExchange.h" +#endif #include "qpid/management/ManagementExchange.h" #include "qpid/framing/reply_exceptions.h" @@ -55,7 +60,13 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent)); }else if (type == ManagementExchange::typeName) { exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent)); - }else{ + } +#ifdef HAVE_XML + else if (type == XmlExchange::typeName) { + exchange = Exchange::shared_ptr(new XmlExchange(name, durable, args, parent)); + } +#endif + else{ throw UnknownExchangeTypeException(); } exchanges[name] = exchange; diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 680e0c7546..d2590debfb 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -69,6 +69,8 @@ namespace qpid { virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); + + virtual Message& getMessage() { return *msg; }; virtual void deliverTo(Queue::shared_ptr& queue); diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp new file mode 100644 index 0000000000..8577e9211c --- /dev/null +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" +#include "XmlExchange.h" + +#include "DeliverableMessage.h" + +#include "qpid/log/Statement.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/reply_exceptions.h" + +#include <xercesc/framework/MemBufInputSource.hpp> + +#include <xqilla/context/ItemFactory.hpp> +#include <xqilla/xqilla-simple.hpp> + +#include <iostream> +#include <sstream> + +using namespace qpid::framing; +using namespace qpid::sys; +using qpid::management::Manageable; + +namespace qpid { +namespace broker { + +XmlExchange::XmlExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +XmlExchange::XmlExchange(const std::string& _name, bool _durable, + const FieldTable& _args, Manageable* _parent) : + Exchange(_name, _durable, _args, _parent) +{ + if (mgmtExchange.get() != 0) + mgmtExchange->set_type (typeName); +} + +/* + * Use the name of the query as the binding key. + * + * The first time a given name is used in a binding, the query body + * must be provided.After that, no query body should be present. + * + * To modify an installed query, the user must first unbind the + * existing query, then replace it by binding again with the same + * name. + * + */ + + // #### TODO: The Binding should take the query text + // #### only. Consider encapsulating the entire block, including + // #### the if condition. + + +bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) +{ + RWlock::ScopedWlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + + string queryText = bindingArguments->getString("xquery"); + + for (i = bindings.begin(); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + if (i == bindings.end()) { + + try { + Query query(xqilla.parse(X(queryText.c_str()))); + XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query)); + XmlBinding::vector bindings(1, binding); + bindingsMap[routingKey] = bindings; + } + catch (XQException& e) { + throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText)); + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_bindings (); + } + return true; + } else{ + return false; + } +} + +bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/) +{ + RWlock::ScopedWlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + + for (i = bindings.begin(); i != bindings.end(); i++) + if ((*i)->queue == queue) + break; + + if (i < bindings.end()) { + bindings.erase(i); + if (bindings.empty()) { + bindingsMap.erase(routingKey); + } + if (mgmtExchange.get() != 0) { + mgmtExchange->dec_bindings (); + } + return true; + } else { + return false; + } +} + +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args) +{ + // ### TODO: Need istream for frameset + // Hack alert - the following code does not work for really large messages + + string msgContent; + msg.getMessage().getFrames().getContent(msgContent); + + boost::scoped_ptr<DynamicContext> context(query->createDynamicContext()); + + try { + XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*)msgContent.c_str(), msgContent.length(), "input" ); + Sequence seq(context->parseDocument(xml)); + + FieldTable::ValueMap::const_iterator v = args->begin(); + for(; v != args->end(); ++v) { + // ### TODO: Do types properly + if (v->second->convertsTo<std::string>()) { + QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str()); + Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get()); + context->setExternalVariable(X(v->first.c_str()), value); + } + } + + if(!seq.isEmpty() && seq.first()->isNode()) { + context->setContextItem(seq.first()); + context->setContextPosition(1); + context->setContextSize(1); + } + Result result = query->execute(context.get()); + return result->getEffectiveBooleanValue(context.get(), 0); + } + catch (XQException& e) { + QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent); + } + return 0; +} + +void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args) +{ + RWlock::ScopedRlock l(lock); + XmlBinding::vector& bindings(bindingsMap[routingKey]); + XmlBinding::vector::iterator i; + int count(0); + + for (i = bindings.begin(); i != bindings.end(); i++, count++) { + + if (matches((*i)->xquery, msg, args)) { + msg.deliverTo((*i)->queue); + + if ((*i)->mgmtBinding.get() != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + } + + if(!count){ + QPID_LOG(warning, "XMLExchange " << getName() << " could not route message with query " << routingKey); + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } + else { + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange.get() != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } + +} + + +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +{ + XmlBinding::vector::iterator j; + + if (routingKey) { + XmlBindingsMap::iterator i = bindingsMap.find(*routingKey); + + if (i == bindingsMap.end()) + return false; + if (!queue) + return true; + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; + } else if (!queue) { + //if no queue or routing key is specified, just report whether any bindings exist + return bindingsMap.size() > 0; + } else { + for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) + for (j = i->second.begin(); j != i->second.end(); j++) + if ((*j)->queue == queue) + return true; + return false; + } + + return false; +} + + +XmlExchange::~XmlExchange() +{ + bindingsMap.clear(); +} + +const std::string XmlExchange::typeName("xml"); + +} +} diff --git a/cpp/src/qpid/broker/XmlExchange.h b/cpp/src/qpid/broker/XmlExchange.h new file mode 100644 index 0000000000..883bfceaca --- /dev/null +++ b/cpp/src/qpid/broker/XmlExchange.h @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _XmlExchange_ +#define _XmlExchange_ + +#include "Exchange.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/Monitor.h" +#include "Queue.h" + +#include <xqilla/xqilla-simple.hpp> + +#include <boost/scoped_ptr.hpp> + +#include <map> +#include <vector> + +namespace qpid { +namespace broker { + +class XmlExchange : public virtual Exchange { + + typedef boost::shared_ptr<XQQuery> Query; + + struct XmlBinding : public Exchange::Binding { + typedef boost::shared_ptr<XmlBinding> shared_ptr; + typedef std::vector<XmlBinding::shared_ptr> vector; + + boost::shared_ptr<XQQuery> xquery; + + XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, Query query): + Binding(key, queue, parent), xquery(query) {} + }; + + + typedef std::map<string, XmlBinding::vector > XmlBindingsMap; + + XmlBindingsMap bindingsMap; + XQilla xqilla; + qpid::sys::RWlock lock; + + bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args); + + public: + static const std::string typeName; + + XmlExchange(const std::string& name, management::Manageable* parent = 0); + XmlExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args, management::Manageable* parent = 0); + + virtual std::string getType() const { return typeName; } + + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + + virtual ~XmlExchange(); +}; + + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Exchange.h b/cpp/src/qpid/client/Exchange.h index 239c131658..f30881c94b 100644 --- a/cpp/src/qpid/client/Exchange.h +++ b/cpp/src/qpid/client/Exchange.h @@ -69,6 +69,13 @@ namespace client { * match zero or more words. */ static const std::string TOPIC_EXCHANGE; + + /** + * + */ + + static const std::string XML_EXCHANGE; + /** * The headers exchange routes messages based on whether their * headers match the binding arguments specified when diff --git a/cpp/src/qpid/framing/FieldTable.h b/cpp/src/qpid/framing/FieldTable.h index 707496a861..3c65d31aee 100644 --- a/cpp/src/qpid/framing/FieldTable.h +++ b/cpp/src/qpid/framing/FieldTable.h @@ -48,6 +48,7 @@ class FieldTable public: typedef boost::shared_ptr<FieldValue> ValuePtr; typedef std::map<std::string, ValuePtr> ValueMap; + typedef ValueMap::iterator iterator; ~FieldTable(); uint32_t size() const; @@ -79,6 +80,10 @@ class FieldTable ValueMap::const_iterator begin() const { return values.begin(); } ValueMap::const_iterator end() const { return values.end(); } ValueMap::const_iterator find(const std::string& s) const { return values.find(s); } + + // ### Hack Alert + + ValueMap::iterator getValues() { return values.begin(); } private: ValueMap values; diff --git a/cpp/src/tests/.valgrind.supp b/cpp/src/tests/.valgrind.supp index e0abf0dd43..3b7b7c9803 100644 --- a/cpp/src/tests/.valgrind.supp +++ b/cpp/src/tests/.valgrind.supp @@ -30,3 +30,11 @@ fun:epoll_ctl } +{ + "Conditional jump or move depends on uninitialised value(s)" from Xerces parser + Memcheck:Cond + fun:_ZN11xercesc_2_717XMLUTF8Transcoder13transcodeFromEPKhjPtjRjPh + fun:_ZN11xercesc_2_79XMLReader14xcodeMoreCharsEPtPhj + fun:_ZN11xercesc_2_79XMLReader17refreshCharBufferEv +} + diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index f215c9f0b4..6fd9cbcb76 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -1,4 +1,4 @@ -AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(APR_CXXFLAGS) -DBOOST_TEST_DYN_LINK +AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(CFLAGS) $(APR_CXXFLAGS) -DBOOST_TEST_DYN_LINK INCLUDES = -I$(srcdir)/.. -I$(srcdir)/../gen -I$(top_builddir)/src/gen abs_builddir=@abs_builddir@ @@ -29,6 +29,7 @@ TESTS+=unit_test check_PROGRAMS+=unit_test unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ $(lib_client) $(lib_broker) # $(lib_amqp_0_10) + unit_test_SOURCES= unit_test.cpp unit_test.h \ BrokerFixture.h SocketProxy.h \ exception_test.cpp \ @@ -45,6 +46,11 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ IncompleteMessageList.cpp \ RangeSet.cpp +if HAVE_XML +unit_test_SOURCES+= XmlClientSessionTest.cpp +endif + + # Disabled till we move to amqp_0_10 codec. # amqp_0_10/serialize.cpp allSegmentTypes.h \ # amqp_0_10/ProxyTemplate.cpp \ diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index aec59a58bc..9abf863ad4 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -54,11 +54,13 @@ public: class FailOnDeliver : public Deliverable { + Message msg; public: void deliverTo(Queue::shared_ptr& queue) { throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); } + Message& getMessage() { return msg; } }; class QueueTest : public CppUnit::TestCase diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp new file mode 100644 index 0000000000..b5c685abbf --- /dev/null +++ b/cpp/src/tests/XmlClientSessionTest.cpp @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apachef Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "unit_test.h" +#include "BrokerFixture.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/framing/TransferContent.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Dispatcher.h" +#include "qpid/client/LocalQueue.h" +#include "qpid/client/Session.h" +#include "qpid/client/SubscriptionManager.h" + +#include <boost/optional.hpp> +#include <boost/lexical_cast.hpp> + +#include <vector> + +QPID_AUTO_TEST_SUITE(XmlClientSessionTest) + +using namespace qpid::client; + +using namespace qpid::client::arg; +using namespace qpid::framing; +using namespace qpid; +using qpid::sys::Monitor; +using std::string; +using std::cout; +using std::endl; + + +struct DummyListener : public sys::Runnable, public MessageListener { + std::vector<Message> messages; + string name; + uint expected; + Dispatcher dispatcher; + + DummyListener(Session& session, const string& n, uint ex) : + name(n), expected(ex), dispatcher(session) {} + + void run() + { + dispatcher.listen(name, this); + dispatcher.run(); + } + + void received(Message& msg) + { + messages.push_back(msg); + if (--expected == 0) + dispatcher.stop(); + } +}; + + +class SubscribedLocalQueue : public LocalQueue { + private: + SubscriptionManager& subscriptions; + public: + SubscribedLocalQueue(SubscriptionManager& subs) : subscriptions(subs) {} + Message get () { return pop(); } + virtual ~SubscribedLocalQueue() {} +}; + + +struct SimpleListener : public MessageListener +{ + Monitor lock; + std::vector<Message> messages; + + void received(Message& msg) + { + Monitor::ScopedLock l(lock); + messages.push_back(msg); + lock.notifyAll(); + } + + void waitFor(const uint n) + { + Monitor::ScopedLock l(lock); + while (messages.size() < n) { + lock.wait(); + } + } +}; + +struct ClientSessionFixture : public ProxySessionFixture +{ + void declareSubscribe(const string& q="odd_blue", + const string& dest="xml") + { + session.queueDeclare(queue=q); + session.messageSubscribe(queue=q, destination=dest, acquireMode=1); + session.messageFlow(destination=dest, unit=0, value=0xFFFFFFFF);//messages + session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes + } +}; + +// ########### START HERE #################################### + +BOOST_AUTO_TEST_CASE(testXmlBinding) { + ClientSessionFixture f; + + Session session = f.connection.newSession(ASYNC); + SubscriptionManager subscriptions(session); + SubscribedLocalQueue localQueue(subscriptions); + + session.exchangeDeclare(qpid::client::arg::exchange="xml", qpid::client::arg::type="xml"); + session.queueDeclare(qpid::client::arg::queue="odd_blue"); + subscriptions.subscribe(localQueue, "odd_blue"); + + FieldTable binding; + binding.setString("xquery", "declare variable $color external;" + "(./message/id mod 2 = 1) and ($color = 'blue')"); + session.exchangeBind(qpid::client::arg::exchange="xml", qpid::client::arg::queue="odd_blue", qpid::client::arg::bindingKey="query_name", qpid::client::arg::arguments=binding); + + Message message; + message.getDeliveryProperties().setRoutingKey("query_name"); + + message.getHeaders().setString("color", "blue"); + string m = "<message><id>1</id></message>"; + message.setData(m); + + session.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="xml"); + + Message m2 = localQueue.get(); + BOOST_CHECK_EQUAL(m, m2.getData()); +} + +//### Test: Bad XML does not kill the server + +//### Test: Bad XQuery does not kill the server + +//### Test: Bindings persist, surviving broker restart + +QPID_AUTO_TEST_SUITE_END() + |