diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 726 |
1 files changed, 520 insertions, 206 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index f51a96efd9..14b5448a34 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -20,18 +20,24 @@ */ #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" #include "qpid/client/amqp0_10/OutgoingMessage.h" #include "qpid/messaging/Address.h" -#include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" +#include "qpid/messaging/Variant.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "qpid/framing/enum.h" +#include "qpid/framing/ExchangeQueryResult.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/QueueQueryResult.h" #include "qpid/framing/ReplyTo.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/Uuid.h" +#include <boost/assign.hpp> +#include <boost/format.hpp> namespace qpid { namespace client { @@ -40,61 +46,145 @@ namespace amqp0_10 { using qpid::Exception; using qpid::messaging::Address; using qpid::messaging::Filter; +using qpid::messaging::InvalidAddress; using qpid::messaging::Variant; +using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; +using qpid::framing::QueueQueryResult; using qpid::framing::ReplyTo; +using qpid::framing::Uuid; using namespace qpid::framing::message; +using namespace boost::assign; namespace{ -const Variant EMPTY_VARIANT; const FieldTable EMPTY_FIELD_TABLE; const std::string EMPTY_STRING; //option names const std::string BROWSE("browse"); const std::string EXCLUSIVE("exclusive"); -const std::string MODE("mode"); -const std::string NAME("name"); -const std::string UNACKNOWLEDGED("unacknowledged"); +const std::string NO_LOCAL("no-local"); +const std::string FILTER("filter"); +const std::string RELIABILITY("reliability"); +const std::string NAME("subscription-name"); +const std::string NODE_PROPERTIES("node-properties"); + +//policy types +const std::string CREATE("create"); +const std::string ASSERT("assert"); +const std::string DELETE("delete"); +//policy values +const std::string ALWAYS("always"); +const std::string NEVER("never"); +const std::string RECEIVER("receiver"); +const std::string SENDER("sender"); const std::string QUEUE_ADDRESS("queue"); const std::string TOPIC_ADDRESS("topic"); -const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+"); -const std::string DIVIDER("/"); -const std::string SIMPLE_SUBSCRIPTION("simple"); -const std::string RELIABLE_SUBSCRIPTION("reliable"); +const std::string UNRELIABLE("unreliable"); +const std::string AT_MOST_ONCE("at-most-once"); +const std::string AT_LEAST_ONCE("at-least-once"); +const std::string EXACTLY_ONCE("exactly-once"); const std::string DURABLE_SUBSCRIPTION("durable"); +const std::string DURABLE("durable"); + +const std::string TOPIC_EXCHANGE("topic"); +const std::string FANOUT_EXCHANGE("fanout"); +const std::string DIRECT_EXCHANGE("direct"); +const std::string HEADERS_EXCHANGE("headers"); +const std::string XML_EXCHANGE("xml"); +const std::string WILDCARD_ANY("*"); } -class QueueSource : public MessageSource +//some amqp 0-10 specific options +namespace xamqp{ +const std::string AUTO_DELETE("x-amqp0-10-auto-delete"); +const std::string EXCHANGE_TYPE("x-amqp0-10-exchange-type"); +const std::string EXCLUSIVE("x-amqp0-10-exclusive"); +const std::string ALTERNATE_EXCHANGE("x-amqp0-10-alternate-exchange"); +const std::string ARGUMENTS("x-amqp0-10-arguments"); +const std::string QUEUE_ARGUMENTS("x-amqp0-10-queue-arguments"); +const std::string SUBSCRIBE_ARGUMENTS("x-amqp0-10-queue-arguments"); +} + +class Node +{ + protected: + enum CheckMode {FOR_RECEIVER, FOR_SENDER}; + + Node(const Address& address); + + const std::string name; + Variant createPolicy; + Variant assertPolicy; + Variant deletePolicy; + + static bool enabled(const Variant& policy, CheckMode mode); + static bool createEnabled(const Address& address, CheckMode mode); + static void convert(const Variant& option, FieldTable& arguments); + static std::vector<std::string> RECEIVER_MODES; + static std::vector<std::string> SENDER_MODES; +}; + +class Queue : protected Node { public: - QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED, - bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE); + Queue(const Address& address); + protected: + void checkCreate(qpid::client::AsyncSession&, CheckMode); + void checkAssert(qpid::client::AsyncSession&, CheckMode); + void checkDelete(qpid::client::AsyncSession&, CheckMode); + private: + bool durable; + bool autoDelete; + bool exclusive; + std::string alternateExchange; + FieldTable arguments; + + void configure(const Address&); +}; + +class Exchange : protected Node +{ + public: + Exchange(const Address& address); + protected: + void checkCreate(qpid::client::AsyncSession&, CheckMode); + void checkAssert(qpid::client::AsyncSession&, CheckMode); + void checkDelete(qpid::client::AsyncSession&, CheckMode); + const std::string& getDesiredExchangeType() { return type; } + + private: + std::string type; + bool durable; + bool autoDelete; + std::string alternateExchange; + FieldTable arguments; + + void configure(const Address&); +}; + +class QueueSource : public Queue, public MessageSource +{ + public: + QueueSource(const Address& address); void subscribe(qpid::client::AsyncSession& session, const std::string& destination); void cancel(qpid::client::AsyncSession& session, const std::string& destination); private: - const std::string name; const AcceptMode acceptMode; const AcquireMode acquireMode; const bool exclusive; - const FieldTable options; + FieldTable options; }; -class Subscription : public MessageSource +class Subscription : public Exchange, public MessageSource { public: - enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE}; - - Subscription(const std::string& name, SubscriptionMode mode = SIMPLE, - const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE); - void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + Subscription(const Address&, const std::string& exchangeType); void subscribe(qpid::client::AsyncSession& session, const std::string& destination); void cancel(qpid::client::AsyncSession& session, const std::string& destination); - - static SubscriptionMode getMode(const std::string& mode); private: struct Binding { @@ -107,155 +197,138 @@ class Subscription : public MessageSource typedef std::vector<Binding> Bindings; - const std::string name; - const bool autoDelete; + const std::string queue; + const bool reliable; const bool durable; - const FieldTable queueOptions; - const FieldTable subscriptionOptions; + FieldTable queueOptions; + FieldTable subscriptionOptions; Bindings bindings; - std::string queue; + + void bindSpecial(const std::string& exchangeType); + void bind(const Variant& filter); + void bind(const Variant::Map& filter); + void bind(const Variant::List& filter); + void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + static std::string getSubscriptionName(const std::string& base, const Variant& name); }; -class Exchange : public MessageSink +class ExchangeSink : public Exchange, public MessageSink { public: - Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING, - bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, - const FieldTable& options = EMPTY_FIELD_TABLE); + ExchangeSink(const Address& name); void declare(qpid::client::AsyncSession& session, const std::string& name); void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: - const std::string name; const std::string defaultSubject; - const bool passive; - const std::string type; - const bool durable; - const FieldTable options; }; -class QueueSink : public MessageSink +class QueueSink : public Queue, public MessageSink { public: - QueueSink(const std::string& name, bool passive=true, bool exclusive=false, - bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE); + QueueSink(const Address& name); void declare(qpid::client::AsyncSession& session, const std::string& name); void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: - const std::string name; - const bool passive; - const bool exclusive; - const bool autoDelete; - const bool durable; - const FieldTable options; }; bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address); -bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject); +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address); -const Variant& getOption(const std::string& key, const Variant::Map& options) +bool in(const Variant& value, const std::vector<std::string>& choices) { - Variant::Map::const_iterator i = options.find(key); - if (i == options.end()) return EMPTY_VARIANT; - else return i->second; + if (!value.isVoid()) { + for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { + if (value.asString() == *i) return true; + } + } + return false; +} + +bool getReceiverPolicy(const Address& address, const std::string& key) +{ + return in(address.getOption(key), list_of<std::string>(ALWAYS)(RECEIVER)); +} + +bool getSenderPolicy(const Address& address, const std::string& key) +{ + return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER)); +} + +bool is_unreliable(const Address& address) +{ + return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); +} + +bool is_reliable(const Address& address) +{ + return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); } std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session, - const Address& address, - const Filter* filter, - const Variant::Map& options) + const Address& address) { //TODO: handle case where there exists a queue and an exchange of //the same name (hence an unqualified address is ambiguous) //TODO: make sure specified address type gives sane error message - //if it does npt match the configuration on server + //if it does not match the configuration on server + + /** + if (Node::createEnabled(address, FOR_RECEIVER)) { + } else { + } + **/ if (isQueue(session, address)) { - //TODO: support auto-created queue as source, if requested by specific option - - AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; - AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED; - bool exclusive = getOption(EXCLUSIVE, options).asBool(); - FieldTable arguments; - //TODO: extract subscribe arguments from options (e.g. either - //filter out already processed keys and send the rest, or have - //a nested map) - - std::auto_ptr<MessageSource> source = - std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments)); + std::auto_ptr<MessageSource> source(new QueueSource(address)); + QPID_LOG(debug, "resolved source address as queue: " << address); return source; } else { - //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important) - std::auto_ptr<Subscription> bindings = - std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(), - Subscription::getMode(getOption(MODE, options).asString()))); - - qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value); - if (result.getNotFound()) { - throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); - } else if (result.getType() == "topic") { - if (filter) { - if (filter->type != Filter::WILDCARD) { - throw qpid::framing::NotImplementedException( - QPID_MSG("Filters of type " << filter->type << " not supported by address " << address)); - - } - for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) { - bindings->add(address.value, *i, qpid::framing::FieldTable()); - } - } else { - //default is to receive all messages - bindings->add(address.value, "*", qpid::framing::FieldTable()); - } - } else if (result.getType() == "fanout") { - if (filter) { - throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address)); - } - bindings->add(address.value, address.value, qpid::framing::FieldTable()); - } else if (result.getType() == "direct") { - //TODO: ???? - } else { - //TODO: xml and headers exchanges - throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address)); - } - std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release()); + qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName()); + std::auto_ptr<MessageSource> source(new Subscription(address, result.getType())); + QPID_LOG(debug, "resolved source address as topic: " << address); return source; } } std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session, - const qpid::messaging::Address& address, - const qpid::messaging::Variant::Map& /*options*/) + const qpid::messaging::Address& address) { std::auto_ptr<MessageSink> sink; if (isQueue(session, address)) { - //TODO: support for auto-created queues as sink - sink = std::auto_ptr<MessageSink>(new QueueSink(address.value)); + sink = std::auto_ptr<MessageSink>(new QueueSink(address)); } else { - std::string subject; - if (isTopic(session, address, subject)) { - //TODO: support for auto-created exchanges as sink - sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject)); + if (isTopic(session, address)) { + sink = std::auto_ptr<MessageSink>(new ExchangeSink(address)); } else { - if (address.type.empty()) { - throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); + if (address.getType().empty()) { + throw InvalidAddress(QPID_MSG("Address not known: " << address)); } else { - throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type)); + throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType())); } } } return sink; } -QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) : - name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {} +QueueSource::QueueSource(const Address& address) : + Queue(address), + acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT), + acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED), + exclusive(address.getOption(EXCLUSIVE).asBool()) +{ + //extract subscription arguments from address options + convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options); +} void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) { + checkCreate(session, FOR_RECEIVER); + checkAssert(session, FOR_RECEIVER); session.messageSubscribe(arg::queue=name, arg::destination=destination, arg::acceptMode=acceptMode, @@ -267,11 +340,48 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination) { session.messageCancel(destination); + checkDelete(session, FOR_RECEIVER); } -Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions) - : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE), - queueOptions(qOptions), subscriptionOptions(sOptions) {} +std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name) +{ + if (name.isVoid()) { + return (boost::format("%1%_%2%") % base % Uuid(true).str()).str(); + } else { + return (boost::format("%1%_%2%") % base % name.asString()).str(); + } +} + +Subscription::Subscription(const Address& address, const std::string& exchangeType) + : Exchange(address), + queue(getSubscriptionName(name, address.getOption(NAME))), + reliable(is_reliable(address)), + durable(address.getOption(DURABLE_SUBSCRIPTION).asBool()) +{ + if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1); + convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions); + convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions); + + const Variant& filter = address.getOption(FILTER); + if (!filter.isVoid()) { + //TODO: if both subject _and_ filter are specified, + //combine in some way; for now we just ignore the + //subject in that case. + bind(filter); + } else if (address.hasSubject()) { + //Note: This will not work for headers- or xml- exchange; + //fanout exchange will do no filtering. + //TODO: for headers- or xml- exchange can construct a match + //for the subject in the application-headers + bind(address.getSubject()); + } else { + //Neither a subject nor a filter has been defined, treat this + //as wanting to match all messages (Note: direct exchange is + //currently unable to support this case). + if (!exchangeType.empty()) bindSpecial(exchangeType); + else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType()); + } +} void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options) { @@ -280,18 +390,19 @@ void Subscription::add(const std::string& exchange, const std::string& key, cons void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination) { - if (name.empty()) { - //TODO: use same scheme as JMS client for subscription queue name generation? - queue = session.getId().getName() + destination; - } else { - queue = name; - } + //create exchange if required and specified by policy: + checkCreate(session, FOR_RECEIVER); + checkAssert(session, FOR_RECEIVER); + + //create subscription queue: session.queueDeclare(arg::queue=queue, arg::exclusive=true, - arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions); + arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); + //bind subscription queue to exchange: for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options); } - AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; + //subscribe to subscription queue: + AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; session.messageSubscribe(arg::queue=queue, arg::destination=destination, arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); } @@ -300,36 +411,23 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string { session.messageCancel(destination); session.queueDelete(arg::queue=queue); + checkDelete(session, FOR_RECEIVER); } Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): exchange(e), key(k), options(o) {} -Subscription::SubscriptionMode Subscription::getMode(const std::string& s) -{ - if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE; - else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE; - else if (s == DURABLE_SUBSCRIPTION) return DURABLE; - else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s)); -} - void convert(qpid::messaging::Message& from, qpid::client::Message& to); -Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject, - bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) : - name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {} +ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {} -void Exchange::declare(qpid::client::AsyncSession& session, const std::string&) +void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&) { - //TODO: should this really by synchronous? want to get error if not valid... - if (passive) { - sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); - } else { - sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options); - } + checkCreate(session, FOR_SENDER); + checkAssert(session, FOR_SENDER); } -void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) +void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { m.message.getDeliveryProperties().setRoutingKey(defaultSubject); @@ -337,22 +435,17 @@ void Exchange::send(qpid::client::AsyncSession& session, const std::string&, Out m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); } -void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {} +void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&) +{ + checkDelete(session, FOR_SENDER); +} -QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive, - bool _autoDelete, bool _durable, const FieldTable& _options) : - name(_name), passive(_passive), exclusive(_exclusive), - autoDelete(_autoDelete), durable(_durable), options(_options) {} +QueueSink::QueueSink(const Address& address) : Queue(address) {} void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) { - //TODO: should this really by synchronous? - if (passive) { - sync(session).queueDeclare(arg::queue=name, arg::passive=true); - } else { - sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable, - arg::autoDelete=autoDelete, arg::arguments=options); - } + checkCreate(session, FOR_SENDER); + checkAssert(session, FOR_SENDER); } void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { @@ -360,9 +453,10 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou m.status = session.messageTransfer(arg::content=m.message); } -void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} - -void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp +void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) +{ + checkDelete(session, FOR_SENDER); +} void convert(qpid::messaging::Message& from, qpid::client::Message& to) { @@ -372,7 +466,7 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to) //TODO: set other delivery properties to.getMessageProperties().setContentType(from.getContentType()); const Address& address = from.getReplyTo(); - if (!address.value.empty()) { + if (!address.getName().empty()) { to.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders()); @@ -381,72 +475,292 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to) Address AddressResolution::convert(const qpid::framing::ReplyTo& rt) { - if (rt.getExchange().empty()) { - if (rt.getRoutingKey().empty()) { - return Address();//empty address - } else { - return Address(rt.getRoutingKey(), QUEUE_ADDRESS); - } + Address address; + if (rt.getExchange().empty()) {//if default exchange, treat as queue + address.setName(rt.getRoutingKey()); + address.setType(QUEUE_ADDRESS); } else { - if (rt.getRoutingKey().empty()) { - return Address(rt.getExchange(), TOPIC_ADDRESS); - } else { - return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT); - } - } + address.setName(rt.getExchange()); + address.setSubject(rt.getRoutingKey()); + address.setType(TOPIC_ADDRESS); + } + return address; } qpid::framing::ReplyTo AddressResolution::convert(const Address& address) { - if (address.type == QUEUE_ADDRESS || address.type.empty()) { - return ReplyTo(EMPTY_STRING, address.value); - } else if (address.type == TOPIC_ADDRESS) { - return ReplyTo(address.value, EMPTY_STRING); - } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { - //need to split the value - string::size_type i = address.value.find(DIVIDER); - if (i != string::npos) { - std::string exchange = address.value.substr(0, i); - std::string routingKey; - if (i+1 < address.value.size()) { - routingKey = address.value.substr(i+1); - } - return ReplyTo(exchange, routingKey); - } else { - return ReplyTo(address.value, EMPTY_STRING); - } + if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) { + return ReplyTo(EMPTY_STRING, address.getName()); + } else if (address.getType() == TOPIC_ADDRESS) { + return ReplyTo(address.getName(), address.getSubject()); } else { - QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type); - //treat as queue - return ReplyTo(EMPTY_STRING, address.value); + QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType()); + return ReplyTo(EMPTY_STRING, address.getName());//treat as queue } } bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) { - return address.type == QUEUE_ADDRESS || - (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value); + return address.getType() == QUEUE_ADDRESS || + (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName()); } -bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject) +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address) { - if (address.type.empty()) { - return !session.exchangeQuery(address.value).getNotFound(); - } else if (address.type == TOPIC_ADDRESS) { - return true; - } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { - string::size_type i = address.value.find(DIVIDER); - if (i != string::npos) { - std::string exchange = address.value.substr(0, i); - if (i+1 < address.value.size()) { - subject = address.value.substr(i+1); - } - } + if (address.getType().empty()) { + return !session.exchangeQuery(address.getName()).getNotFound(); + } else if (address.getType() == TOPIC_ADDRESS) { return true; } else { return false; } } +void Subscription::bind(const Variant& filter) +{ + switch (filter.getType()) { + case qpid::messaging::VAR_MAP: + bind(filter.asMap()); + break; + case qpid::messaging::VAR_LIST: + bind(filter.asList()); + break; + default: + add(name, filter.asString()); + break; + } +} + +void Subscription::bind(const Variant::Map& filter) +{ + qpid::framing::FieldTable arguments; + translate(filter, arguments); + add(name, queue, arguments); +} + +void Subscription::bind(const Variant::List& filter) +{ + for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) { + bind(*i); + } +} + +void Subscription::bindSpecial(const std::string& exchangeType) +{ + if (exchangeType == TOPIC_EXCHANGE) { + add(name, WILDCARD_ANY); + } else if (exchangeType == FANOUT_EXCHANGE) { + add(name, queue); + } else if (exchangeType == HEADERS_EXCHANGE) { + //TODO: add special binding for headers exchange to match all messages + } else if (exchangeType == XML_EXCHANGE) { + //TODO: add special binding for xml exchange to match all messages + } else { //E.g. direct + throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType)); + } +} + +Node::Node(const Address& address) : name(address.getName()), + createPolicy(address.getOption(CREATE)), + assertPolicy(address.getOption(ASSERT)), + deletePolicy(address.getOption(DELETE)) {} + +Queue::Queue(const Address& a) : Node(a), + durable(false), + autoDelete(false), + exclusive(false) +{ + configure(a); +} + +void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(createPolicy, mode)) { + QPID_LOG(debug, "Auto-creating queue '" << name << "'"); + try { + sync(session).queueDeclare(arg::queue=name, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::exclusive=exclusive, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str()); + } + } else { + try { + sync(session).queueDeclare(arg::queue=name, arg::passive=true); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str()); + } + } +} + +void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(deletePolicy, mode)) { + QPID_LOG(debug, "Auto-deleting queue '" << name << "'"); + sync(session).queueDelete(arg::queue=name); + } +} + +void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(assertPolicy, mode)) { + QueueQueryResult result = sync(session).queueQuery(name); + if (result.getQueue() != name) { + throw InvalidAddress((boost::format("Queue not found: %1%") % name).str()); + } else { + if (durable && !result.getDurable()) { + throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str()); + } + if (autoDelete && !result.getAutoDelete()) { + throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str()); + } + if (exclusive && !result.getExclusive()) { + throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str()); + } + if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { + throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + % name % alternateExchange % result.getAlternateExchange()).str()); + } + for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { + FieldTable::ValuePtr v = result.getArguments().get(i->first); + if (!v) { + throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + } else if (*i->second != *v) { + throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + % i->first % name % *(i->second) % *v).str()); + } + } + } + } +} + +void Queue::configure(const Address& address) +{ + const Variant& properties = address.getOption(NODE_PROPERTIES); + if (!properties.isVoid()) { + Variant::Map p = properties.asMap(); + durable = p[DURABLE]; + autoDelete = p[xamqp::AUTO_DELETE]; + exclusive = p[xamqp::EXCLUSIVE]; + alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString(); + if (!p[xamqp::ARGUMENTS].isVoid()) { + translate(p[xamqp::ARGUMENTS].asMap(), arguments); + } + } +} + +Exchange::Exchange(const Address& a) : Node(a), + durable(false), + autoDelete(false) +{ + configure(a); +} + +void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(createPolicy, mode)) { + try { + sync(session).exchangeDeclare(arg::exchange=name, + arg::type=type, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str()); + } + } else { + try { + sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str()); + } + } +} + +void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(deletePolicy, mode)) { + sync(session).exchangeDelete(arg::exchange=name); + } +} + +void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(assertPolicy, mode)) { + ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name); + if (result.getNotFound()) { + throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str()); + } else { + if (!type.empty() && result.getType() != type) { + throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + % name % type % result.getType()).str()); + } + if (durable && !result.getDurable()) { + throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str()); + } + //Note: Can't check auto-delete or alternate-exchange via + //exchange-query-result as these are not returned + //TODO: could use a passive declare to check alternate-exchange + for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { + FieldTable::ValuePtr v = result.getArguments().get(i->first); + if (!v) { + throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + } else if (i->second != v) { + throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + % i->first % name % *(i->second) % *v).str()); + } + } + } + } +} + +void Exchange::configure(const Address& address) +{ + const Variant& properties = address.getOption(NODE_PROPERTIES); + if (!properties.isVoid()) { + Variant::Map p = properties.asMap(); + durable = p[DURABLE]; + autoDelete = p[xamqp::AUTO_DELETE]; + type = p[xamqp::EXCHANGE_TYPE].asString(); + alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString(); + if (!p[xamqp::ARGUMENTS].isVoid()) { + translate(p[xamqp::ARGUMENTS].asMap(), arguments); + } + } +} + + +bool Node::enabled(const Variant& policy, CheckMode mode) +{ + bool result; + switch (mode) { + case FOR_RECEIVER: + result = in(policy, RECEIVER_MODES); + break; + case FOR_SENDER: + result = in(policy, SENDER_MODES); + break; + } + return result; +} + +bool Node::createEnabled(const Address& address, CheckMode mode) +{ + const Variant& policy = address.getOption(CREATE); + return enabled(policy, mode); +} + +void Node::convert(const Variant& options, FieldTable& arguments) +{ + if (!options.isVoid()) { + translate(options.asMap(), arguments); + } +} +std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER); +std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER); }}} // namespace qpid::client::amqp0_10 |