summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp44
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h20
-rw-r--r--cpp/src/qpid/broker/BrokerExchange.h8
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp17
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h5
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp4
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h1
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp19
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp24
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h3
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp25
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h2
-rw-r--r--cpp/src/tests/ExchangeTest.cpp96
-rw-r--r--python/tests_0-9/query.py155
17 files changed, 434 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index e099c4a56d..93a3a319ac 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -40,6 +40,7 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
channelHandler(*this),
connectionHandler(*this),
exchangeHandler(*this),
+ bindingHandler(*this),
messageHandler(*this),
queueHandler(*this),
txHandler(*this),
@@ -160,6 +161,49 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u
if(!nowait) client.deleteOk(context.getRequestId());
}
+void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name)
+{
+ try {
+ Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId());
+ } catch (const ChannelException& e) {
+ client.queryOk("", false, true, FieldTable(), context.getRequestId());
+ }
+}
+
+void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context,
+ u_int16_t /*ticket*/,
+ const std::string& exchangeName,
+ const std::string& queueName,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ Exchange::shared_ptr exchange;
+ try {
+ exchange = broker.getExchanges().get(exchangeName);
+ } catch (const ChannelException&) {}
+
+ Queue::shared_ptr queue;
+ if (!queueName.empty()) {
+ queue = broker.getQueues().find(queueName);
+ }
+
+ if (!exchange) {
+ client.queryOk(true, false, false, false, false, context.getRequestId());
+ } else if (!queueName.empty() && !queue) {
+ client.queryOk(false, true, false, false, false, context.getRequestId());
+ } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
+ client.queryOk(false, false, false, false, false, context.getRequestId());
+ } else {
+ //need to test each specified option individually
+ bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
+ bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
+ bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
+
+ client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId());
+ }
+}
+
void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 6b54575776..01ece30cfa 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -63,6 +63,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
BasicHandler* getBasicHandler() { return &basicHandler; }
ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+ BindingHandler* getBindingHandler() { return &bindingHandler; }
QueueHandler* getQueueHandler() { return &queueHandler; }
TxHandler* getTxHandler() { return &txHandler; }
MessageHandler* getMessageHandler() { return &messageHandler; }
@@ -142,6 +143,24 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
const qpid::framing::FieldTable& arguments);
void delete_(const framing::MethodContext& context, uint16_t ticket,
const std::string& exchange, bool ifUnused, bool nowait);
+ void query(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const string& name);
+ };
+
+ class BindingHandlerImpl :
+ public BindingHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Binding>
+ {
+ public:
+ BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void query(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& exchange,
+ const std::string& queue,
+ const std::string& routingKey,
+ const framing::FieldTable& arguments);
};
class QueueHandlerImpl :
@@ -214,6 +233,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
ChannelHandlerImpl channelHandler;
ConnectionHandlerImpl connectionHandler;
ExchangeHandlerImpl exchangeHandler;
+ BindingHandlerImpl bindingHandler;
MessageHandlerImpl messageHandler;
QueueHandlerImpl queueHandler;
TxHandlerImpl txHandler;
diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h
index 62c82aa935..968775cfe5 100644
--- a/cpp/src/qpid/broker/BrokerExchange.h
+++ b/cpp/src/qpid/broker/BrokerExchange.h
@@ -39,6 +39,8 @@ namespace qpid {
const string name;
const bool durable;
qpid::framing::FieldTable args;
+ boost::shared_ptr<Exchange> alternate;
+ uint32_t alternateUsers;
mutable uint64_t persistenceId;
public:
@@ -53,9 +55,15 @@ namespace qpid {
bool isDurable() { return durable; }
qpid::framing::FieldTable& getArgs() { return args; }
+ Exchange::shared_ptr getAlternate() { return alternate; }
+ void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; }
+ void incAlternateUsers() { alternateUsers++; }
+ void decAlternateUsers() { alternateUsers--; }
+
virtual string getType() const = 0;
virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
//PersistableExchange:
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 58a8c85fcb..f11766e7ec 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -23,6 +23,8 @@
#include "qpid/log/Statement.h"
#include "BrokerQueue.h"
+#include "BrokerExchange.h"
+#include "DeliverableMessage.h"
#include "MessageStore.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
@@ -233,6 +235,16 @@ void Queue::configure(const FieldTable& _settings)
void Queue::destroy()
{
+ if (alternateExchange.get()) {
+ Mutex::ScopedLock locker(lock);
+ while(!messages.empty()){
+ DeliverableMessage msg(messages.front());
+ alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
+ &(msg.getMessage().getApplicationHeaders()));
+ pop();
+ }
+ }
+
if (store) {
store->destroy(*this);
}
@@ -289,3 +301,8 @@ Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer)
return result.first;
}
+
+void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
+{
+ alternateExchange = exchange;
+}
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index efb31ba216..ee472db97b 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -42,6 +42,7 @@ namespace qpid {
namespace broker {
class MessageStore;
class QueueRegistry;
+ class Exchange;
/**
* Thrown when exclusive access would be violated.
@@ -74,6 +75,7 @@ namespace qpid {
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
+ boost::shared_ptr<Exchange> alternateExchange;
void pop();
void push(Message::shared_ptr& msg);
@@ -142,6 +144,9 @@ namespace qpid {
const QueuePolicy* const getPolicy();
+ void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
+
+
//PersistableQueue support:
uint64_t getPersistenceId() const;
void setPersistenceId(uint64_t persistenceId) const;
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index 0c2e46ccce..a713f306a8 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -31,3 +31,7 @@ void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
queue->deliver(msg);
}
+Message& DeliverableMessage::getMessage()
+{
+ return *msg;
+}
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index 43e738a96a..e8c4f5ba19 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -32,6 +32,7 @@ namespace qpid {
public:
DeliverableMessage(Message::shared_ptr& msg);
virtual void deliverTo(Queue::shared_ptr& queue);
+ Message& getMessage();
virtual ~DeliverableMessage(){}
};
}
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 01817144d6..7b516414c0 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -69,6 +69,25 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie
}
}
+
+bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+ if (routingKey) {
+ Bindings::iterator i = bindings.find(*routingKey);
+ return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+ } else if (!queue) {
+ //if no queue or routing key is specified, just report whether any bindings exist
+ return bindings.size() > 0;
+ } else {
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
DirectExchange::~DirectExchange(){
}
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index a06da10f6f..059f69a9ad 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -32,7 +32,9 @@
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- std::map<string, std::vector<Queue::shared_ptr> > bindings;
+ typedef std::vector<Queue::shared_ptr> Queues;
+ typedef std::map<string, Queues > Bindings;
+ Bindings bindings;
qpid::sys::Mutex lock;
public:
@@ -50,6 +52,8 @@ namespace broker {
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
virtual ~DirectExchange();
};
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 5f3a66d115..79702394ad 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -58,6 +58,12 @@ void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const
}
}
+bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
+{
+ return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+}
+
+
FanOutExchange::~FanOutExchange() {}
const std::string FanOutExchange::typeName("fanout");
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index cfab710a35..4ebd10d3cf 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -51,6 +51,8 @@ class FanOutExchange : public virtual Exchange {
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
virtual ~FanOutExchange();
};
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index c33d638fce..a9405e1f6d 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -80,6 +80,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
}
}
+
+bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
+{
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if ( (!args || equal(i->first, *args)) && i->second == queue) {
+ return true;
+ }
+ }
+ return false;
+}
+
HeadersExchange::~HeadersExchange() {}
const std::string HeadersExchange::typeName("headers");
@@ -129,5 +140,18 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
}
}
+bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) {
+ typedef FieldTable::ValueMap Map;
+ for (Map::const_iterator i = a.getMap().begin();
+ i != a.getMap().end();
+ ++i)
+ {
+ Map::const_iterator j = b.getMap().find(i->first);
+ if (j == b.getMap().end()) return false;
+ if (!match_values(*(i->second), *(j->second))) return false;
+ }
+ return true;
+}
+
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index e35ef21ccd..51aa30f56d 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -54,9 +54,12 @@ class HeadersExchange : public virtual Exchange {
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
virtual ~HeadersExchange();
static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+ static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
};
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 4ad1607aa2..80c207b56a 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -162,6 +162,31 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel
}
}
+bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+ if (routingKey && queue) {
+ TopicPattern key(*routingKey);
+ return isBound(queue, key);
+ } else if (!routingKey && !queue) {
+ return bindings.size() > 0;
+ } else if (routingKey) {
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first.match(*routingKey)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ Queue::vector& qv(i->second);
+ if (find(qv.begin(), qv.end(), queue) != qv.end()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
TopicExchange::~TopicExchange() {}
const std::string TopicExchange::typeName("topic");
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 2220e0112b..4b294de5ae 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -92,6 +92,8 @@ class TopicExchange : public virtual Exchange{
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
virtual ~TopicExchange();
};
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 16a0da0746..0033aa7529 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -19,10 +19,12 @@
*
*/
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/BrokerExchange.h"
#include "qpid/broker/BrokerQueue.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid_test_plugin.h"
#include <iostream>
@@ -36,6 +38,7 @@ class ExchangeTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ExchangeTest);
CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST(testIsBound);
CPPUNIT_TEST_SUITE_END();
public:
@@ -66,6 +69,95 @@ class ExchangeTest : public CppUnit::TestCase
direct.route(msg, "abc", 0);
}
+
+ void testIsBound()
+ {
+ Queue::shared_ptr a(new Queue("a", true));
+ Queue::shared_ptr b(new Queue("b", true));
+ Queue::shared_ptr c(new Queue("c", true));
+ Queue::shared_ptr d(new Queue("d", true));
+
+ string k1("abc");
+ string k2("def");
+ string k3("xyz");
+
+ FanOutExchange fanout("fanout");
+ fanout.bind(a, "", 0);
+ fanout.bind(b, "", 0);
+ fanout.bind(c, "", 0);
+
+ CPPUNIT_ASSERT(fanout.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(fanout.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(fanout.isBound(c, 0, 0));
+ CPPUNIT_ASSERT(!fanout.isBound(d, 0, 0));
+
+ DirectExchange direct("direct");
+ direct.bind(a, k1, 0);
+ direct.bind(a, k3, 0);
+ direct.bind(b, k2, 0);
+ direct.bind(c, k1, 0);
+
+ CPPUNIT_ASSERT(direct.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(direct.isBound(a, &k1, 0));
+ CPPUNIT_ASSERT(direct.isBound(a, &k3, 0));
+ CPPUNIT_ASSERT(!direct.isBound(a, &k2, 0));
+ CPPUNIT_ASSERT(direct.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(direct.isBound(b, &k2, 0));
+ CPPUNIT_ASSERT(direct.isBound(c, &k1, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, 0, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, &k1, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, &k2, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, &k3, 0));
+
+ TopicExchange topic("topic");
+ topic.bind(a, k1, 0);
+ topic.bind(a, k3, 0);
+ topic.bind(b, k2, 0);
+ topic.bind(c, k1, 0);
+
+ CPPUNIT_ASSERT(topic.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(topic.isBound(a, &k1, 0));
+ CPPUNIT_ASSERT(topic.isBound(a, &k3, 0));
+ CPPUNIT_ASSERT(!topic.isBound(a, &k2, 0));
+ CPPUNIT_ASSERT(topic.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(topic.isBound(b, &k2, 0));
+ CPPUNIT_ASSERT(topic.isBound(c, &k1, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, 0, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, &k1, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, &k2, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, &k3, 0));
+
+ HeadersExchange headers("headers");
+ FieldTable args1;
+ args1.setString("x-match", "all");
+ args1.setString("a", "A");
+ args1.setInt("b", 1);
+ FieldTable args2;
+ args2.setString("x-match", "any");
+ args2.setString("a", "A");
+ args2.setInt("b", 1);
+ FieldTable args3;
+ args3.setString("x-match", "any");
+ args3.setString("c", "C");
+ args3.setInt("b", 6);
+
+ headers.bind(a, "", &args1);
+ headers.bind(a, "", &args3);
+ headers.bind(b, "", &args2);
+ headers.bind(c, "", &args1);
+
+ CPPUNIT_ASSERT(headers.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(headers.isBound(a, 0, &args1));
+ CPPUNIT_ASSERT(headers.isBound(a, 0, &args3));
+ CPPUNIT_ASSERT(!headers.isBound(a, 0, &args2));
+ CPPUNIT_ASSERT(headers.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(headers.isBound(b, 0, &args2));
+ CPPUNIT_ASSERT(headers.isBound(c, 0, &args1));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, 0));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, &args1));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, &args2));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, &args3));
+ }
};
// Make this test suite a plugin.
diff --git a/python/tests_0-9/query.py b/python/tests_0-9/query.py
new file mode 100644
index 0000000000..69111f03fa
--- /dev/null
+++ b/python/tests_0-9/query.py
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueryTests(TestBase):
+ """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+
+ def test_exchange_query(self):
+ """
+ Test that the exchange_query method works as expected
+ """
+ channel = self.channel
+ #check returned type for the standard exchanges
+ self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
+ self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
+ self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
+ self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
+ self.assertEqual("direct", channel.exchange_query(name="").type)
+ #declare an exchange
+ channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ #check that the result of a query is as expected
+ response = channel.exchange_query(name="my-test-exchange")
+ self.assertEqual("direct", response.type)
+ self.assertEqual(False, response.durable)
+ self.assertEqual(False, response.not_found)
+ #delete the exchange
+ channel.exchange_delete(exchange="my-test-exchange")
+ #check that the query now reports not-found
+ self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+
+ def test_binding_query_direct(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.direct")
+
+ def test_binding_query_topic(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.topic")
+
+ def binding_query_with_key(self, exchange_name):
+ channel = self.channel
+ #setup: create two queues
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+
+ channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+
+
+ def test_binding_query_fanout(self):
+ """
+ Test that the binding_query method works as expected with fanout exchange
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+
+ response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+
+ def test_binding_query_header(self):
+ """
+ Test that the binding_query method works as expected with headers exchanges
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+ response = channel.binding_query(exchange="amq.match", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+
+