diff options
author | Gordon Sim <gsim@apache.org> | 2007-06-27 12:39:49 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-06-27 12:39:49 +0000 |
commit | 5a88e6f19bddc0b9f6da4712b616f5f08b4dec25 (patch) | |
tree | 6d32b3a3ef5416bb40027852b98eda9389cbbb6c | |
parent | 4add83a2d2482a51f447ca71d2385ae19ea173fa (diff) | |
download | qpid-python-5a88e6f19bddc0b9f6da4712b616f5f08b4dec25.tar.gz |
Added preview of exchange- and binding- query methods that have been approved for 0-10.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551144 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerExchange.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliverableMessage.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliverableMessage.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/ExchangeTest.cpp | 96 | ||||
-rw-r--r-- | python/tests_0-9/query.py | 155 |
17 files changed, 434 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index e099c4a56d..93a3a319ac 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -40,6 +40,7 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : channelHandler(*this), connectionHandler(*this), exchangeHandler(*this), + bindingHandler(*this), messageHandler(*this), queueHandler(*this), txHandler(*this), @@ -160,6 +161,49 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u if(!nowait) client.deleteOk(context.getRequestId()); } +void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name) +{ + try { + Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId()); + } catch (const ChannelException& e) { + client.queryOk("", false, true, FieldTable(), context.getRequestId()); + } +} + +void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context, + u_int16_t /*ticket*/, + const std::string& exchangeName, + const std::string& queueName, + const std::string& key, + const framing::FieldTable& args) +{ + Exchange::shared_ptr exchange; + try { + exchange = broker.getExchanges().get(exchangeName); + } catch (const ChannelException&) {} + + Queue::shared_ptr queue; + if (!queueName.empty()) { + queue = broker.getQueues().find(queueName); + } + + if (!exchange) { + client.queryOk(true, false, false, false, false, context.getRequestId()); + } else if (!queueName.empty() && !queue) { + client.queryOk(false, true, false, false, false, context.getRequestId()); + } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { + client.queryOk(false, false, false, false, false, context.getRequestId()); + } else { + //need to test each specified option individually + bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); + bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); + bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); + + client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId()); + } +} + void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 6b54575776..01ece30cfa 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -63,6 +63,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations ConnectionHandler* getConnectionHandler() { return &connectionHandler; } BasicHandler* getBasicHandler() { return &basicHandler; } ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } + BindingHandler* getBindingHandler() { return &bindingHandler; } QueueHandler* getQueueHandler() { return &queueHandler; } TxHandler* getTxHandler() { return &txHandler; } MessageHandler* getMessageHandler() { return &messageHandler; } @@ -142,6 +143,24 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations const qpid::framing::FieldTable& arguments); void delete_(const framing::MethodContext& context, uint16_t ticket, const std::string& exchange, bool ifUnused, bool nowait); + void query(const framing::MethodContext& context, + u_int16_t ticket, + const string& name); + }; + + class BindingHandlerImpl : + public BindingHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Binding> + { + public: + BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + + void query(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& exchange, + const std::string& queue, + const std::string& routingKey, + const framing::FieldTable& arguments); }; class QueueHandlerImpl : @@ -214,6 +233,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations ChannelHandlerImpl channelHandler; ConnectionHandlerImpl connectionHandler; ExchangeHandlerImpl exchangeHandler; + BindingHandlerImpl bindingHandler; MessageHandlerImpl messageHandler; QueueHandlerImpl queueHandler; TxHandlerImpl txHandler; diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h index 62c82aa935..968775cfe5 100644 --- a/cpp/src/qpid/broker/BrokerExchange.h +++ b/cpp/src/qpid/broker/BrokerExchange.h @@ -39,6 +39,8 @@ namespace qpid { const string name; const bool durable; qpid::framing::FieldTable args; + boost::shared_ptr<Exchange> alternate; + uint32_t alternateUsers; mutable uint64_t persistenceId; public: @@ -53,9 +55,15 @@ namespace qpid { bool isDurable() { return durable; } qpid::framing::FieldTable& getArgs() { return args; } + Exchange::shared_ptr getAlternate() { return alternate; } + void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; } + void incAlternateUsers() { alternateUsers++; } + void decAlternateUsers() { alternateUsers--; } + virtual string getType() const = 0; virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args) = 0; virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; //PersistableExchange: diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 58a8c85fcb..f11766e7ec 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -23,6 +23,8 @@ #include "qpid/log/Statement.h" #include "BrokerQueue.h" +#include "BrokerExchange.h" +#include "DeliverableMessage.h" #include "MessageStore.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" @@ -233,6 +235,16 @@ void Queue::configure(const FieldTable& _settings) void Queue::destroy() { + if (alternateExchange.get()) { + Mutex::ScopedLock locker(lock); + while(!messages.empty()){ + DeliverableMessage msg(messages.front()); + alternateExchange->route(msg, msg.getMessage().getRoutingKey(), + &(msg.getMessage().getApplicationHeaders())); + pop(); + } + } + if (store) { store->destroy(*this); } @@ -289,3 +301,8 @@ Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer) return result.first; } + +void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) +{ + alternateExchange = exchange; +} diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index efb31ba216..ee472db97b 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -42,6 +42,7 @@ namespace qpid { namespace broker { class MessageStore; class QueueRegistry; + class Exchange; /** * Thrown when exclusive access would be violated. @@ -74,6 +75,7 @@ namespace qpid { framing::FieldTable settings; std::auto_ptr<QueuePolicy> policy; QueueBindings bindings; + boost::shared_ptr<Exchange> alternateExchange; void pop(); void push(Message::shared_ptr& msg); @@ -142,6 +144,9 @@ namespace qpid { const QueuePolicy* const getPolicy(); + void setAlternateExchange(boost::shared_ptr<Exchange> exchange); + + //PersistableQueue support: uint64_t getPersistenceId() const; void setPersistenceId(uint64_t persistenceId) const; diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index 0c2e46ccce..a713f306a8 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -31,3 +31,7 @@ void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) queue->deliver(msg); } +Message& DeliverableMessage::getMessage() +{ + return *msg; +} diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h index 43e738a96a..e8c4f5ba19 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.h +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -32,6 +32,7 @@ namespace qpid { public: DeliverableMessage(Message::shared_ptr& msg); virtual void deliverTo(Queue::shared_ptr& queue); + Message& getMessage(); virtual ~DeliverableMessage(){} }; } diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 01817144d6..7b516414c0 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -69,6 +69,25 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie } } + +bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +{ + if (routingKey) { + Bindings::iterator i = bindings.find(*routingKey); + return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end()); + } else if (!queue) { + //if no queue or routing key is specified, just report whether any bindings exist + return bindings.size() > 0; + } else { + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) { + return true; + } + } + return false; + } +} + DirectExchange::~DirectExchange(){ } diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index a06da10f6f..059f69a9ad 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -32,7 +32,9 @@ namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ - std::map<string, std::vector<Queue::shared_ptr> > bindings; + typedef std::vector<Queue::shared_ptr> Queues; + typedef std::map<string, Queues > Bindings; + Bindings bindings; qpid::sys::Mutex lock; public: @@ -50,6 +52,8 @@ namespace broker { virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + virtual ~DirectExchange(); }; } diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 5f3a66d115..79702394ad 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -58,6 +58,12 @@ void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const } } +bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) +{ + return std::find(bindings.begin(), bindings.end(), queue) != bindings.end(); +} + + FanOutExchange::~FanOutExchange() {} const std::string FanOutExchange::typeName("fanout"); diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index cfab710a35..4ebd10d3cf 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -51,6 +51,8 @@ class FanOutExchange : public virtual Exchange { virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + virtual ~FanOutExchange(); }; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index c33d638fce..a9405e1f6d 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -80,6 +80,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons } } + +bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) +{ + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if ( (!args || equal(i->first, *args)) && i->second == queue) { + return true; + } + } + return false; +} + HeadersExchange::~HeadersExchange() {} const std::string HeadersExchange::typeName("headers"); @@ -129,5 +140,18 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { } } +bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) { + typedef FieldTable::ValueMap Map; + for (Map::const_iterator i = a.getMap().begin(); + i != a.getMap().end(); + ++i) + { + Map::const_iterator j = b.getMap().find(i->first); + if (j == b.getMap().end()) return false; + if (!match_values(*(i->second), *(j->second))) return false; + } + return true; +} + diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index e35ef21ccd..51aa30f56d 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -54,9 +54,12 @@ class HeadersExchange : public virtual Exchange { virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + virtual ~HeadersExchange(); static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); + static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); }; diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 4ad1607aa2..80c207b56a 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -162,6 +162,31 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel } } +bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +{ + if (routingKey && queue) { + TopicPattern key(*routingKey); + return isBound(queue, key); + } else if (!routingKey && !queue) { + return bindings.size() > 0; + } else if (routingKey) { + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (i->first.match(*routingKey)) { + return true; + } + } + return false; + } else { + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + Queue::vector& qv(i->second); + if (find(qv.begin(), qv.end(), queue) != qv.end()) { + return true; + } + } + return false; + } +} + TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 2220e0112b..4b294de5ae 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -92,6 +92,8 @@ class TopicExchange : public virtual Exchange{ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + virtual ~TopicExchange(); }; diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 16a0da0746..0033aa7529 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -19,10 +19,12 @@ * */ -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/DirectExchange.h" #include "qpid/broker/BrokerExchange.h" #include "qpid/broker/BrokerQueue.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/DirectExchange.h" +#include "qpid/broker/FanOutExchange.h" +#include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" #include "qpid_test_plugin.h" #include <iostream> @@ -36,6 +38,7 @@ class ExchangeTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ExchangeTest); CPPUNIT_TEST(testMe); + CPPUNIT_TEST(testIsBound); CPPUNIT_TEST_SUITE_END(); public: @@ -66,6 +69,95 @@ class ExchangeTest : public CppUnit::TestCase direct.route(msg, "abc", 0); } + + void testIsBound() + { + Queue::shared_ptr a(new Queue("a", true)); + Queue::shared_ptr b(new Queue("b", true)); + Queue::shared_ptr c(new Queue("c", true)); + Queue::shared_ptr d(new Queue("d", true)); + + string k1("abc"); + string k2("def"); + string k3("xyz"); + + FanOutExchange fanout("fanout"); + fanout.bind(a, "", 0); + fanout.bind(b, "", 0); + fanout.bind(c, "", 0); + + CPPUNIT_ASSERT(fanout.isBound(a, 0, 0)); + CPPUNIT_ASSERT(fanout.isBound(b, 0, 0)); + CPPUNIT_ASSERT(fanout.isBound(c, 0, 0)); + CPPUNIT_ASSERT(!fanout.isBound(d, 0, 0)); + + DirectExchange direct("direct"); + direct.bind(a, k1, 0); + direct.bind(a, k3, 0); + direct.bind(b, k2, 0); + direct.bind(c, k1, 0); + + CPPUNIT_ASSERT(direct.isBound(a, 0, 0)); + CPPUNIT_ASSERT(direct.isBound(a, &k1, 0)); + CPPUNIT_ASSERT(direct.isBound(a, &k3, 0)); + CPPUNIT_ASSERT(!direct.isBound(a, &k2, 0)); + CPPUNIT_ASSERT(direct.isBound(b, 0, 0)); + CPPUNIT_ASSERT(direct.isBound(b, &k2, 0)); + CPPUNIT_ASSERT(direct.isBound(c, &k1, 0)); + CPPUNIT_ASSERT(!direct.isBound(d, 0, 0)); + CPPUNIT_ASSERT(!direct.isBound(d, &k1, 0)); + CPPUNIT_ASSERT(!direct.isBound(d, &k2, 0)); + CPPUNIT_ASSERT(!direct.isBound(d, &k3, 0)); + + TopicExchange topic("topic"); + topic.bind(a, k1, 0); + topic.bind(a, k3, 0); + topic.bind(b, k2, 0); + topic.bind(c, k1, 0); + + CPPUNIT_ASSERT(topic.isBound(a, 0, 0)); + CPPUNIT_ASSERT(topic.isBound(a, &k1, 0)); + CPPUNIT_ASSERT(topic.isBound(a, &k3, 0)); + CPPUNIT_ASSERT(!topic.isBound(a, &k2, 0)); + CPPUNIT_ASSERT(topic.isBound(b, 0, 0)); + CPPUNIT_ASSERT(topic.isBound(b, &k2, 0)); + CPPUNIT_ASSERT(topic.isBound(c, &k1, 0)); + CPPUNIT_ASSERT(!topic.isBound(d, 0, 0)); + CPPUNIT_ASSERT(!topic.isBound(d, &k1, 0)); + CPPUNIT_ASSERT(!topic.isBound(d, &k2, 0)); + CPPUNIT_ASSERT(!topic.isBound(d, &k3, 0)); + + HeadersExchange headers("headers"); + FieldTable args1; + args1.setString("x-match", "all"); + args1.setString("a", "A"); + args1.setInt("b", 1); + FieldTable args2; + args2.setString("x-match", "any"); + args2.setString("a", "A"); + args2.setInt("b", 1); + FieldTable args3; + args3.setString("x-match", "any"); + args3.setString("c", "C"); + args3.setInt("b", 6); + + headers.bind(a, "", &args1); + headers.bind(a, "", &args3); + headers.bind(b, "", &args2); + headers.bind(c, "", &args1); + + CPPUNIT_ASSERT(headers.isBound(a, 0, 0)); + CPPUNIT_ASSERT(headers.isBound(a, 0, &args1)); + CPPUNIT_ASSERT(headers.isBound(a, 0, &args3)); + CPPUNIT_ASSERT(!headers.isBound(a, 0, &args2)); + CPPUNIT_ASSERT(headers.isBound(b, 0, 0)); + CPPUNIT_ASSERT(headers.isBound(b, 0, &args2)); + CPPUNIT_ASSERT(headers.isBound(c, 0, &args1)); + CPPUNIT_ASSERT(!headers.isBound(d, 0, 0)); + CPPUNIT_ASSERT(!headers.isBound(d, 0, &args1)); + CPPUNIT_ASSERT(!headers.isBound(d, 0, &args2)); + CPPUNIT_ASSERT(!headers.isBound(d, 0, &args3)); + } }; // Make this test suite a plugin. diff --git a/python/tests_0-9/query.py b/python/tests_0-9/query.py new file mode 100644 index 0000000000..69111f03fa --- /dev/null +++ b/python/tests_0-9/query.py @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class QueryTests(TestBase): + """Tests for various query methods introduced in 0-10 and available in 0-9 for preview""" + + def test_exchange_query(self): + """ + Test that the exchange_query method works as expected + """ + channel = self.channel + #check returned type for the standard exchanges + self.assertEqual("direct", channel.exchange_query(name="amq.direct").type) + self.assertEqual("topic", channel.exchange_query(name="amq.topic").type) + self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type) + self.assertEqual("headers", channel.exchange_query(name="amq.match").type) + self.assertEqual("direct", channel.exchange_query(name="").type) + #declare an exchange + channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) + #check that the result of a query is as expected + response = channel.exchange_query(name="my-test-exchange") + self.assertEqual("direct", response.type) + self.assertEqual(False, response.durable) + self.assertEqual(False, response.not_found) + #delete the exchange + channel.exchange_delete(exchange="my-test-exchange") + #check that the query now reports not-found + self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found) + + def test_binding_query_direct(self): + """ + Test that the binding_query method works as expected with the direct exchange + """ + self.binding_query_with_key("amq.direct") + + def test_binding_query_topic(self): + """ + Test that the binding_query method works as expected with the direct exchange + """ + self.binding_query_with_key("amq.topic") + + def binding_query_with_key(self, exchange_name): + channel = self.channel + #setup: create two queues + channel.queue_declare(queue="used-queue", exclusive=True) + channel.queue_declare(queue="unused-queue", exclusive=True) + + channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key") + + # test detection of any binding to specific queue + response = channel.binding_query(exchange=exchange_name, queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + + # test detection of specific binding to any queue + response = channel.binding_query(exchange=exchange_name, routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.key_not_matched) + + # test detection of specific binding to specific queue + response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(False, response.key_not_matched) + + # test unmatched queue, unspecified binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + # test unspecified queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.key_not_matched) + + # test matched queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + # test unmatched queue, matched binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(False, response.key_not_matched) + + # test unmatched queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + #test exchange not found + self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found) + + + def test_binding_query_fanout(self): + """ + Test that the binding_query method works as expected with fanout exchange + """ + channel = self.channel + #setup + channel.queue_declare(queue="used-queue", exclusive=True) + channel.queue_declare(queue="unused-queue", exclusive=True) + channel.queue_bind(exchange="amq.fanout", queue="used-queue") + + response = channel.binding_query(exchange="amq.fanout", queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + + def test_binding_query_header(self): + """ + Test that the binding_query method works as expected with headers exchanges + """ + channel = self.channel + #setup + channel.queue_declare(queue="used-queue", exclusive=True) + channel.queue_declare(queue="unused-queue", exclusive=True) + channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) + + response = channel.binding_query(exchange="amq.match", queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + + |