diff options
| author | Gordon Sim <gsim@apache.org> | 2009-11-19 22:08:30 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-11-19 22:08:30 +0000 |
| commit | 003364c3f387a158cf7a7e3cdfd5e2ddc38eae57 (patch) | |
| tree | 48fe82188d587c5ab90f98ea51f53a87c39f84e5 /cpp/src | |
| parent | 4e7097e788097ffa50ff0a13ba13ee2d137f70ca (diff) | |
| download | qpid-python-003364c3f387a158cf7a7e3cdfd5e2ddc38eae57.tar.gz | |
QPID-664: Refine address resolution; if type not specified, default to queue as per python client.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@882323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 111 | ||||
| -rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 2 |
2 files changed, 82 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index e215d03937..b1e6a9b671 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -30,6 +30,7 @@ #include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "qpid/framing/enum.h" +#include "qpid/framing/ExchangeBoundResult.h" #include "qpid/framing/ExchangeQueryResult.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/QueueQueryResult.h" @@ -48,6 +49,7 @@ using qpid::messaging::Address; using qpid::messaging::Filter; using qpid::messaging::InvalidAddress; using qpid::messaging::Variant; +using qpid::framing::ExchangeBoundResult; using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; using qpid::framing::QueueQueryResult; @@ -182,7 +184,7 @@ class QueueSource : public Queue, public MessageSource class Subscription : public Exchange, public MessageSource { public: - Subscription(const Address&, const std::string& exchangeType); + Subscription(const Address&, const std::string& exchangeType=""); void subscribe(qpid::client::AsyncSession& session, const std::string& destination); void cancel(qpid::client::AsyncSession& session, const std::string& destination); private: @@ -270,27 +272,47 @@ bool is_reliable(const Address& address) std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session, const Address& address) { - //TODO: handle case where there exists a queue and an exchange of - //the same name (hence an unqualified address is ambiguous) - - //TODO: make sure specified address type gives sane error message - //if it does not match the configuration on server - - /** - if (Node::createEnabled(address, FOR_RECEIVER)) { - } else { - } - **/ - - if (isQueue(session, address)) { - std::auto_ptr<MessageSource> source(new QueueSource(address)); - QPID_LOG(debug, "resolved source address as queue: " << address); - return source; - } else { + ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName()); + if (result.getQueueNotFound() && result.getExchangeNotFound()) { + //neither a queue nor an exchange exists with that name + if (address.getType() == TOPIC_ADDRESS) { + std::auto_ptr<MessageSource> source(new Subscription(address)); + QPID_LOG(debug, "treating source address as topic: " << address); + return source; + } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) { + std::auto_ptr<MessageSource> source(new QueueSource(address)); + QPID_LOG(debug, "treating source address as queue: " << address); + return source; + } else { + throw InvalidAddress("Unrecognised type: " + address.getType()); + } + } else if (result.getQueueNotFound()) { + //only an exchange exists with that name qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName()); std::auto_ptr<MessageSource> source(new Subscription(address, result.getType())); QPID_LOG(debug, "resolved source address as topic: " << address); return source; + } else if (result.getExchangeNotFound()) { + //only an queue exists with that name + std::auto_ptr<MessageSource> source(new QueueSource(address)); + QPID_LOG(debug, "resolved source address as queue: " << address); + return source; + } else { + //both a queue and exchange exist for that name + if (address.getType() == TOPIC_ADDRESS) { + qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName()); + std::auto_ptr<MessageSource> source(new Subscription(address, result.getType())); + QPID_LOG(debug, "resolved source address as topic: " << address); + return source; + } else if (address.getType() == QUEUE_ADDRESS) { + std::auto_ptr<MessageSource> source(new QueueSource(address)); + QPID_LOG(debug, "resolved source address as queue: " << address); + return source; + } else if (address.getType().empty()) { + throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); + } else { + throw InvalidAddress("Unrecognised type: " + address.getType()); + } } } @@ -298,21 +320,46 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session, const qpid::messaging::Address& address) { - std::auto_ptr<MessageSink> sink; - if (isQueue(session, address)) { - sink = std::auto_ptr<MessageSink>(new QueueSink(address)); + ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName()); + if (result.getQueueNotFound() && result.getExchangeNotFound()) { + //neither a queue nor an exchange exists with that name + if (address.getType() == TOPIC_ADDRESS) { + std::auto_ptr<MessageSink> sink(new ExchangeSink(address)); + QPID_LOG(debug, "treating target address as topic: " << address); + return sink; + } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) { + std::auto_ptr<MessageSink> sink(new QueueSink(address)); + QPID_LOG(debug, "treating target address as queue: " << address); + return sink; + } else { + throw InvalidAddress("Unrecognised type: " + address.getType()); + } + } else if (result.getQueueNotFound()) { + //only an exchange exists with that name + std::auto_ptr<MessageSink> sink(new ExchangeSink(address)); + QPID_LOG(debug, "resolved target address as topic: " << address); + return sink; + } else if (result.getExchangeNotFound()) { + //only an queue exists with that name + std::auto_ptr<MessageSink> sink(new QueueSink(address)); + QPID_LOG(debug, "resolved target address as queue: " << address); + return sink; } else { - if (isTopic(session, address)) { - sink = std::auto_ptr<MessageSink>(new ExchangeSink(address)); + //both a queue and exchange exist for that name + if (address.getType() == TOPIC_ADDRESS) { + std::auto_ptr<MessageSink> sink(new ExchangeSink(address)); + QPID_LOG(debug, "resolved target address as topic: " << address); + return sink; + } else if (address.getType() == QUEUE_ADDRESS) { + std::auto_ptr<MessageSink> sink(new QueueSink(address)); + QPID_LOG(debug, "resolved target address as queue: " << address); + return sink; + } else if (address.getType().empty()) { + throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); } else { - if (address.getType().empty()) { - throw InvalidAddress(QPID_MSG("Address not known: " << address)); - } else { - throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType())); - } + throw InvalidAddress("Unrecognised type: " + address.getType()); } } - return sink; } QueueSource::QueueSource(const Address& address) : @@ -590,8 +637,10 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) } else { try { sync(session).queueDeclare(arg::queue=name, arg::passive=true); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) { + throw InvalidAddress((boost::format("Queue %1% does not exist") % name).str()); + } catch (const std::exception& e) { + throw InvalidAddress(e.what()); } } } diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 09440cc183..1293ce9429 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -248,6 +248,7 @@ QPID_AUTO_TEST_CASE(testSenderError) MessagingFixture fix; ScopedSuppressLogging sl; BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress); + fix.session = fix.connection.newSession(); BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver, type:queue}"), qpid::messaging::InvalidAddress); } @@ -257,6 +258,7 @@ QPID_AUTO_TEST_CASE(testReceiverError) MessagingFixture fix; ScopedSuppressLogging sl; BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress); + fix.session = fix.connection.newSession(); BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender, type:queue}"), qpid::messaging::InvalidAddress); } |
