summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-19 22:08:30 +0000
committerGordon Sim <gsim@apache.org>2009-11-19 22:08:30 +0000
commit003364c3f387a158cf7a7e3cdfd5e2ddc38eae57 (patch)
tree48fe82188d587c5ab90f98ea51f53a87c39f84e5 /cpp/src
parent4e7097e788097ffa50ff0a13ba13ee2d137f70ca (diff)
downloadqpid-python-003364c3f387a158cf7a7e3cdfd5e2ddc38eae57.tar.gz
QPID-664: Refine address resolution; if type not specified, default to queue as per python client.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@882323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp111
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp2
2 files changed, 82 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index e215d03937..b1e6a9b671 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -30,6 +30,7 @@
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/ExchangeBoundResult.h"
#include "qpid/framing/ExchangeQueryResult.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/QueueQueryResult.h"
@@ -48,6 +49,7 @@ using qpid::messaging::Address;
using qpid::messaging::Filter;
using qpid::messaging::InvalidAddress;
using qpid::messaging::Variant;
+using qpid::framing::ExchangeBoundResult;
using qpid::framing::ExchangeQueryResult;
using qpid::framing::FieldTable;
using qpid::framing::QueueQueryResult;
@@ -182,7 +184,7 @@ class QueueSource : public Queue, public MessageSource
class Subscription : public Exchange, public MessageSource
{
public:
- Subscription(const Address&, const std::string& exchangeType);
+ Subscription(const Address&, const std::string& exchangeType="");
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
@@ -270,27 +272,47 @@ bool is_reliable(const Address& address)
std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
const Address& address)
{
- //TODO: handle case where there exists a queue and an exchange of
- //the same name (hence an unqualified address is ambiguous)
-
- //TODO: make sure specified address type gives sane error message
- //if it does not match the configuration on server
-
- /**
- if (Node::createEnabled(address, FOR_RECEIVER)) {
- } else {
- }
- **/
-
- if (isQueue(session, address)) {
- std::auto_ptr<MessageSource> source(new QueueSource(address));
- QPID_LOG(debug, "resolved source address as queue: " << address);
- return source;
- } else {
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name
+ if (address.getType() == TOPIC_ADDRESS) {
+ std::auto_ptr<MessageSource> source(new Subscription(address));
+ QPID_LOG(debug, "treating source address as topic: " << address);
+ return source;
+ } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "treating source address as queue: " << address);
+ return source;
+ } else {
+ throw InvalidAddress("Unrecognised type: " + address.getType());
+ }
+ } else if (result.getQueueNotFound()) {
+ //only an exchange exists with that name
qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
QPID_LOG(debug, "resolved source address as topic: " << address);
return source;
+ } else if (result.getExchangeNotFound()) {
+ //only an queue exists with that name
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "resolved source address as queue: " << address);
+ return source;
+ } else {
+ //both a queue and exchange exist for that name
+ if (address.getType() == TOPIC_ADDRESS) {
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
+ std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
+ QPID_LOG(debug, "resolved source address as topic: " << address);
+ return source;
+ } else if (address.getType() == QUEUE_ADDRESS) {
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "resolved source address as queue: " << address);
+ return source;
+ } else if (address.getType().empty()) {
+ throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
+ } else {
+ throw InvalidAddress("Unrecognised type: " + address.getType());
+ }
}
}
@@ -298,21 +320,46 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess
std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
const qpid::messaging::Address& address)
{
- std::auto_ptr<MessageSink> sink;
- if (isQueue(session, address)) {
- sink = std::auto_ptr<MessageSink>(new QueueSink(address));
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name
+ if (address.getType() == TOPIC_ADDRESS) {
+ std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
+ QPID_LOG(debug, "treating target address as topic: " << address);
+ return sink;
+ } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+ std::auto_ptr<MessageSink> sink(new QueueSink(address));
+ QPID_LOG(debug, "treating target address as queue: " << address);
+ return sink;
+ } else {
+ throw InvalidAddress("Unrecognised type: " + address.getType());
+ }
+ } else if (result.getQueueNotFound()) {
+ //only an exchange exists with that name
+ std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
+ QPID_LOG(debug, "resolved target address as topic: " << address);
+ return sink;
+ } else if (result.getExchangeNotFound()) {
+ //only an queue exists with that name
+ std::auto_ptr<MessageSink> sink(new QueueSink(address));
+ QPID_LOG(debug, "resolved target address as queue: " << address);
+ return sink;
} else {
- if (isTopic(session, address)) {
- sink = std::auto_ptr<MessageSink>(new ExchangeSink(address));
+ //both a queue and exchange exist for that name
+ if (address.getType() == TOPIC_ADDRESS) {
+ std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
+ QPID_LOG(debug, "resolved target address as topic: " << address);
+ return sink;
+ } else if (address.getType() == QUEUE_ADDRESS) {
+ std::auto_ptr<MessageSink> sink(new QueueSink(address));
+ QPID_LOG(debug, "resolved target address as queue: " << address);
+ return sink;
+ } else if (address.getType().empty()) {
+ throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
} else {
- if (address.getType().empty()) {
- throw InvalidAddress(QPID_MSG("Address not known: " << address));
- } else {
- throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType()));
- }
+ throw InvalidAddress("Unrecognised type: " + address.getType());
}
}
- return sink;
}
QueueSource::QueueSource(const Address& address) :
@@ -590,8 +637,10 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
} else {
try {
sync(session).queueDeclare(arg::queue=name, arg::passive=true);
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {
+ throw InvalidAddress((boost::format("Queue %1% does not exist") % name).str());
+ } catch (const std::exception& e) {
+ throw InvalidAddress(e.what());
}
}
}
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index 09440cc183..1293ce9429 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -248,6 +248,7 @@ QPID_AUTO_TEST_CASE(testSenderError)
MessagingFixture fix;
ScopedSuppressLogging sl;
BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ fix.session = fix.connection.newSession();
BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver, type:queue}"),
qpid::messaging::InvalidAddress);
}
@@ -257,6 +258,7 @@ QPID_AUTO_TEST_CASE(testReceiverError)
MessagingFixture fix;
ScopedSuppressLogging sl;
BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ fix.session = fix.connection.newSession();
BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender, type:queue}"),
qpid::messaging::InvalidAddress);
}