summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-06 17:52:03 +0000
committerAlan Conway <aconway@apache.org>2008-05-06 17:52:03 +0000
commit8d8a9162f7ba5a99a7c8b8b57aae860ab3028078 (patch)
treeaa17b9f56448a5280e13441a7b0473dd5faee283 /cpp/src
parente4a3049c22b36171d204a8f84ddbcbf5accf797f (diff)
downloadqpid-python-8d8a9162f7ba5a99a7c8b8b57aae860ab3028078.tar.gz
From https://issues.apache.org/jira/browse/QPID-879 contributed by Jonathan Robie.
XML exchange allowing messages to be routed base on XQuery expressions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@653854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am24
-rw-r--r--cpp/src/qpid/broker/Deliverable.h4
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp13
-rw-r--r--cpp/src/qpid/broker/TxPublish.h2
-rw-r--r--cpp/src/qpid/broker/XmlExchange.cpp249
-rw-r--r--cpp/src/qpid/broker/XmlExchange.h87
-rw-r--r--cpp/src/qpid/client/Exchange.h7
-rw-r--r--cpp/src/qpid/framing/FieldTable.h5
-rw-r--r--cpp/src/tests/.valgrind.supp8
-rw-r--r--cpp/src/tests/Makefile.am8
-rw-r--r--cpp/src/tests/QueueTest.cpp2
-rw-r--r--cpp/src/tests/XmlClientSessionTest.cpp157
12 files changed, 559 insertions, 7 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index bbe4cb73d3..5c052b0fe3 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -46,7 +46,7 @@ DISTCLEANFILES+=qpid/framing/MaxMethodBodySize.h
## Compiler flags
-AM_CXXFLAGS = $(WARNING_CFLAGS)
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(CFLAGS)
AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
INCLUDES = -Igen -I$(srcdir)/gen
@@ -221,8 +221,12 @@ libqpidcommon_la_SOURCES = \
qpid/ISList.h \
qpid/pointer_to_other.h
-libqpidbroker_la_LIBADD = libqpidcommon.la \
- -luuid
+libqpidbroker_la_LIBADD = libqpidcommon.la -luuid
+if HAVE_XML
+libqpidbroker_la_LIBADD += -lxerces-c -lxqilla
+endif
+
+
libqpidbroker_la_SOURCES = \
$(mgen_broker_cpp) \
qpid/amqp_0_10/Connection.h \
@@ -288,8 +292,13 @@ libqpidbroker_la_SOURCES = \
qpid/management/ManagementObject.cpp \
qpid/sys/TCPIOPlugin.cpp
-libqpidclient_la_LIBADD = libqpidcommon.la \
- -luuid
+if HAVE_XML
+libqpidbroker_la_SOURCES += qpid/broker/XmlExchange.cpp
+endif
+
+
+libqpidclient_la_LIBADD = libqpidcommon.la -luuid
+
libqpidclient_la_SOURCES = \
$(rgen_client_srcs) \
qpid/client/Bounds.cpp \
@@ -528,6 +537,11 @@ nobase_include_HEADERS = \
qpid/sys/Time.h \
qpid/sys/TimeoutHandler.h
+if HAVE_XML
+nobase_include_HEADERS += qpid/broker/XmlExchange.h
+endif
+
+
# Force build of qpidd during dist phase so help2man will work.
dist-hook: $(BUILT_SOURCES)
$(MAKE) qpidd
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h
index e46d2024bf..c40780c4ae 100644
--- a/cpp/src/qpid/broker/Deliverable.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -22,6 +22,7 @@
#define _Deliverable_
#include "Queue.h"
+#include "Message.h"
namespace qpid {
namespace broker {
@@ -29,6 +30,9 @@ namespace qpid {
public:
bool delivered;
Deliverable() : delivered(false) {}
+
+ virtual Message& getMessage() = 0;
+
virtual void deliverTo(Queue::shared_ptr& queue) = 0;
virtual uint64_t contentSize() { return 0; }
virtual ~Deliverable(){}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 58d9d5efb8..c1eb5ff5a3 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -18,11 +18,16 @@
* under the License.
*
*/
+
+#include "config.h"
#include "ExchangeRegistry.h"
#include "DirectExchange.h"
#include "FanOutExchange.h"
#include "HeadersExchange.h"
#include "TopicExchange.h"
+#ifdef HAVE_XML
+#include "XmlExchange.h"
+#endif
#include "qpid/management/ManagementExchange.h"
#include "qpid/framing/reply_exceptions.h"
@@ -55,7 +60,13 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent));
}else if (type == ManagementExchange::typeName) {
exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent));
- }else{
+ }
+#ifdef HAVE_XML
+ else if (type == XmlExchange::typeName) {
+ exchange = Exchange::shared_ptr(new XmlExchange(name, durable, args, parent));
+ }
+#endif
+ else{
throw UnknownExchangeTypeException();
}
exchanges[name] = exchange;
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 680e0c7546..d2590debfb 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -69,6 +69,8 @@ namespace qpid {
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
+
+ virtual Message& getMessage() { return *msg; };
virtual void deliverTo(Queue::shared_ptr& queue);
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp
new file mode 100644
index 0000000000..8577e9211c
--- /dev/null
+++ b/cpp/src/qpid/broker/XmlExchange.cpp
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "config.h"
+#include "XmlExchange.h"
+
+#include "DeliverableMessage.h"
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+
+#include <xercesc/framework/MemBufInputSource.hpp>
+
+#include <xqilla/context/ItemFactory.hpp>
+#include <xqilla/xqilla-simple.hpp>
+
+#include <iostream>
+#include <sstream>
+
+using namespace qpid::framing;
+using namespace qpid::sys;
+using qpid::management::Manageable;
+
+namespace qpid {
+namespace broker {
+
+XmlExchange::XmlExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+XmlExchange::XmlExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+/*
+ * Use the name of the query as the binding key.
+ *
+ * The first time a given name is used in a binding, the query body
+ * must be provided.After that, no query body should be present.
+ *
+ * To modify an installed query, the user must first unbind the
+ * existing query, then replace it by binding again with the same
+ * name.
+ *
+ */
+
+ // #### TODO: The Binding should take the query text
+ // #### only. Consider encapsulating the entire block, including
+ // #### the if condition.
+
+
+bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments)
+{
+ RWlock::ScopedWlock l(lock);
+ XmlBinding::vector& bindings(bindingsMap[routingKey]);
+ XmlBinding::vector::iterator i;
+
+ string queryText = bindingArguments->getString("xquery");
+
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ if (i == bindings.end()) {
+
+ try {
+ Query query(xqilla.parse(X(queryText.c_str())));
+ XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query));
+ XmlBinding::vector bindings(1, binding);
+ bindingsMap[routingKey] = bindings;
+ }
+ catch (XQException& e) {
+ throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
+ return true;
+ } else{
+ return false;
+ }
+}
+
+bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+{
+ RWlock::ScopedWlock l(lock);
+ XmlBinding::vector& bindings(bindingsMap[routingKey]);
+ XmlBinding::vector::iterator i;
+
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ if (i < bindings.end()) {
+ bindings.erase(i);
+ if (bindings.empty()) {
+ bindingsMap.erase(routingKey);
+ }
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args)
+{
+ // ### TODO: Need istream for frameset
+ // Hack alert - the following code does not work for really large messages
+
+ string msgContent;
+ msg.getMessage().getFrames().getContent(msgContent);
+
+ boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
+
+ try {
+ XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*)msgContent.c_str(), msgContent.length(), "input" );
+ Sequence seq(context->parseDocument(xml));
+
+ FieldTable::ValueMap::const_iterator v = args->begin();
+ for(; v != args->end(); ++v) {
+ // ### TODO: Do types properly
+ if (v->second->convertsTo<std::string>()) {
+ QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
+ Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
+ context->setExternalVariable(X(v->first.c_str()), value);
+ }
+ }
+
+ if(!seq.isEmpty() && seq.first()->isNode()) {
+ context->setContextItem(seq.first());
+ context->setContextPosition(1);
+ context->setContextSize(1);
+ }
+ Result result = query->execute(context.get());
+ return result->getEffectiveBooleanValue(context.get(), 0);
+ }
+ catch (XQException& e) {
+ QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
+ }
+ return 0;
+}
+
+void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args)
+{
+ RWlock::ScopedRlock l(lock);
+ XmlBinding::vector& bindings(bindingsMap[routingKey]);
+ XmlBinding::vector::iterator i;
+ int count(0);
+
+ for (i = bindings.begin(); i != bindings.end(); i++, count++) {
+
+ if (matches((*i)->xquery, msg, args)) {
+ msg.deliverTo((*i)->queue);
+
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+ }
+
+ if(!count){
+ QPID_LOG(warning, "XMLExchange " << getName() << " could not route message with query " << routingKey);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ }
+
+}
+
+
+bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+ XmlBinding::vector::iterator j;
+
+ if (routingKey) {
+ XmlBindingsMap::iterator i = bindingsMap.find(*routingKey);
+
+ if (i == bindingsMap.end())
+ return false;
+ if (!queue)
+ return true;
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
+ } else if (!queue) {
+ //if no queue or routing key is specified, just report whether any bindings exist
+ return bindingsMap.size() > 0;
+ } else {
+ for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++)
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
+ return false;
+ }
+
+ return false;
+}
+
+
+XmlExchange::~XmlExchange()
+{
+ bindingsMap.clear();
+}
+
+const std::string XmlExchange::typeName("xml");
+
+}
+}
diff --git a/cpp/src/qpid/broker/XmlExchange.h b/cpp/src/qpid/broker/XmlExchange.h
new file mode 100644
index 0000000000..883bfceaca
--- /dev/null
+++ b/cpp/src/qpid/broker/XmlExchange.h
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _XmlExchange_
+#define _XmlExchange_
+
+#include "Exchange.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Monitor.h"
+#include "Queue.h"
+
+#include <xqilla/xqilla-simple.hpp>
+
+#include <boost/scoped_ptr.hpp>
+
+#include <map>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class XmlExchange : public virtual Exchange {
+
+ typedef boost::shared_ptr<XQQuery> Query;
+
+ struct XmlBinding : public Exchange::Binding {
+ typedef boost::shared_ptr<XmlBinding> shared_ptr;
+ typedef std::vector<XmlBinding::shared_ptr> vector;
+
+ boost::shared_ptr<XQQuery> xquery;
+
+ XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, Query query):
+ Binding(key, queue, parent), xquery(query) {}
+ };
+
+
+ typedef std::map<string, XmlBinding::vector > XmlBindingsMap;
+
+ XmlBindingsMap bindingsMap;
+ XQilla xqilla;
+ qpid::sys::RWlock lock;
+
+ bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args);
+
+ public:
+ static const std::string typeName;
+
+ XmlExchange(const std::string& name, management::Manageable* parent = 0);
+ XmlExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
+
+ virtual std::string getType() const { return typeName; }
+
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
+ virtual ~XmlExchange();
+};
+
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/client/Exchange.h b/cpp/src/qpid/client/Exchange.h
index 239c131658..f30881c94b 100644
--- a/cpp/src/qpid/client/Exchange.h
+++ b/cpp/src/qpid/client/Exchange.h
@@ -69,6 +69,13 @@ namespace client {
* match zero or more words.
*/
static const std::string TOPIC_EXCHANGE;
+
+ /**
+ *
+ */
+
+ static const std::string XML_EXCHANGE;
+
/**
* The headers exchange routes messages based on whether their
* headers match the binding arguments specified when
diff --git a/cpp/src/qpid/framing/FieldTable.h b/cpp/src/qpid/framing/FieldTable.h
index 707496a861..3c65d31aee 100644
--- a/cpp/src/qpid/framing/FieldTable.h
+++ b/cpp/src/qpid/framing/FieldTable.h
@@ -48,6 +48,7 @@ class FieldTable
public:
typedef boost::shared_ptr<FieldValue> ValuePtr;
typedef std::map<std::string, ValuePtr> ValueMap;
+ typedef ValueMap::iterator iterator;
~FieldTable();
uint32_t size() const;
@@ -79,6 +80,10 @@ class FieldTable
ValueMap::const_iterator begin() const { return values.begin(); }
ValueMap::const_iterator end() const { return values.end(); }
ValueMap::const_iterator find(const std::string& s) const { return values.find(s); }
+
+ // ### Hack Alert
+
+ ValueMap::iterator getValues() { return values.begin(); }
private:
ValueMap values;
diff --git a/cpp/src/tests/.valgrind.supp b/cpp/src/tests/.valgrind.supp
index e0abf0dd43..3b7b7c9803 100644
--- a/cpp/src/tests/.valgrind.supp
+++ b/cpp/src/tests/.valgrind.supp
@@ -30,3 +30,11 @@
fun:epoll_ctl
}
+{
+ "Conditional jump or move depends on uninitialised value(s)" from Xerces parser
+ Memcheck:Cond
+ fun:_ZN11xercesc_2_717XMLUTF8Transcoder13transcodeFromEPKhjPtjRjPh
+ fun:_ZN11xercesc_2_79XMLReader14xcodeMoreCharsEPtPhj
+ fun:_ZN11xercesc_2_79XMLReader17refreshCharBufferEv
+}
+
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index f215c9f0b4..6fd9cbcb76 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -1,4 +1,4 @@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(APR_CXXFLAGS) -DBOOST_TEST_DYN_LINK
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(CFLAGS) $(APR_CXXFLAGS) -DBOOST_TEST_DYN_LINK
INCLUDES = -I$(srcdir)/.. -I$(srcdir)/../gen -I$(top_builddir)/src/gen
abs_builddir=@abs_builddir@
@@ -29,6 +29,7 @@ TESTS+=unit_test
check_PROGRAMS+=unit_test
unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \
$(lib_client) $(lib_broker) # $(lib_amqp_0_10)
+
unit_test_SOURCES= unit_test.cpp unit_test.h \
BrokerFixture.h SocketProxy.h \
exception_test.cpp \
@@ -45,6 +46,11 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
IncompleteMessageList.cpp \
RangeSet.cpp
+if HAVE_XML
+unit_test_SOURCES+= XmlClientSessionTest.cpp
+endif
+
+
# Disabled till we move to amqp_0_10 codec.
# amqp_0_10/serialize.cpp allSegmentTypes.h \
# amqp_0_10/ProxyTemplate.cpp \
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index aec59a58bc..9abf863ad4 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -54,11 +54,13 @@ public:
class FailOnDeliver : public Deliverable
{
+ Message msg;
public:
void deliverTo(Queue::shared_ptr& queue)
{
throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
}
+ Message& getMessage() { return msg; }
};
class QueueTest : public CppUnit::TestCase
diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp
new file mode 100644
index 0000000000..b5c685abbf
--- /dev/null
+++ b/cpp/src/tests/XmlClientSessionTest.cpp
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apachef Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Dispatcher.h"
+#include "qpid/client/LocalQueue.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+
+#include <boost/optional.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include <vector>
+
+QPID_AUTO_TEST_SUITE(XmlClientSessionTest)
+
+using namespace qpid::client;
+
+using namespace qpid::client::arg;
+using namespace qpid::framing;
+using namespace qpid;
+using qpid::sys::Monitor;
+using std::string;
+using std::cout;
+using std::endl;
+
+
+struct DummyListener : public sys::Runnable, public MessageListener {
+ std::vector<Message> messages;
+ string name;
+ uint expected;
+ Dispatcher dispatcher;
+
+ DummyListener(Session& session, const string& n, uint ex) :
+ name(n), expected(ex), dispatcher(session) {}
+
+ void run()
+ {
+ dispatcher.listen(name, this);
+ dispatcher.run();
+ }
+
+ void received(Message& msg)
+ {
+ messages.push_back(msg);
+ if (--expected == 0)
+ dispatcher.stop();
+ }
+};
+
+
+class SubscribedLocalQueue : public LocalQueue {
+ private:
+ SubscriptionManager& subscriptions;
+ public:
+ SubscribedLocalQueue(SubscriptionManager& subs) : subscriptions(subs) {}
+ Message get () { return pop(); }
+ virtual ~SubscribedLocalQueue() {}
+};
+
+
+struct SimpleListener : public MessageListener
+{
+ Monitor lock;
+ std::vector<Message> messages;
+
+ void received(Message& msg)
+ {
+ Monitor::ScopedLock l(lock);
+ messages.push_back(msg);
+ lock.notifyAll();
+ }
+
+ void waitFor(const uint n)
+ {
+ Monitor::ScopedLock l(lock);
+ while (messages.size() < n) {
+ lock.wait();
+ }
+ }
+};
+
+struct ClientSessionFixture : public ProxySessionFixture
+{
+ void declareSubscribe(const string& q="odd_blue",
+ const string& dest="xml")
+ {
+ session.queueDeclare(queue=q);
+ session.messageSubscribe(queue=q, destination=dest, acquireMode=1);
+ session.messageFlow(destination=dest, unit=0, value=0xFFFFFFFF);//messages
+ session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
+ }
+};
+
+// ########### START HERE ####################################
+
+BOOST_AUTO_TEST_CASE(testXmlBinding) {
+ ClientSessionFixture f;
+
+ Session session = f.connection.newSession(ASYNC);
+ SubscriptionManager subscriptions(session);
+ SubscribedLocalQueue localQueue(subscriptions);
+
+ session.exchangeDeclare(qpid::client::arg::exchange="xml", qpid::client::arg::type="xml");
+ session.queueDeclare(qpid::client::arg::queue="odd_blue");
+ subscriptions.subscribe(localQueue, "odd_blue");
+
+ FieldTable binding;
+ binding.setString("xquery", "declare variable $color external;"
+ "(./message/id mod 2 = 1) and ($color = 'blue')");
+ session.exchangeBind(qpid::client::arg::exchange="xml", qpid::client::arg::queue="odd_blue", qpid::client::arg::bindingKey="query_name", qpid::client::arg::arguments=binding);
+
+ Message message;
+ message.getDeliveryProperties().setRoutingKey("query_name");
+
+ message.getHeaders().setString("color", "blue");
+ string m = "<message><id>1</id></message>";
+ message.setData(m);
+
+ session.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="xml");
+
+ Message m2 = localQueue.get();
+ BOOST_CHECK_EQUAL(m, m2.getData());
+}
+
+//### Test: Bad XML does not kill the server
+
+//### Test: Bad XQuery does not kill the server
+
+//### Test: Bindings persist, surviving broker restart
+
+QPID_AUTO_TEST_SUITE_END()
+