summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp726
1 files changed, 520 insertions, 206 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index f51a96efd9..14b5448a34 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -20,18 +20,24 @@
*/
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
#include "qpid/client/amqp0_10/OutgoingMessage.h"
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/ExchangeQueryResult.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/QueueQueryResult.h"
#include "qpid/framing/ReplyTo.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
namespace qpid {
namespace client {
@@ -40,61 +46,145 @@ namespace amqp0_10 {
using qpid::Exception;
using qpid::messaging::Address;
using qpid::messaging::Filter;
+using qpid::messaging::InvalidAddress;
using qpid::messaging::Variant;
+using qpid::framing::ExchangeQueryResult;
using qpid::framing::FieldTable;
+using qpid::framing::QueueQueryResult;
using qpid::framing::ReplyTo;
+using qpid::framing::Uuid;
using namespace qpid::framing::message;
+using namespace boost::assign;
namespace{
-const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
const std::string EMPTY_STRING;
//option names
const std::string BROWSE("browse");
const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NAME("name");
-const std::string UNACKNOWLEDGED("unacknowledged");
+const std::string NO_LOCAL("no-local");
+const std::string FILTER("filter");
+const std::string RELIABILITY("reliability");
+const std::string NAME("subscription-name");
+const std::string NODE_PROPERTIES("node-properties");
+
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
const std::string QUEUE_ADDRESS("queue");
const std::string TOPIC_ADDRESS("topic");
-const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+");
-const std::string DIVIDER("/");
-const std::string SIMPLE_SUBSCRIPTION("simple");
-const std::string RELIABLE_SUBSCRIPTION("reliable");
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
const std::string DURABLE_SUBSCRIPTION("durable");
+const std::string DURABLE("durable");
+
+const std::string TOPIC_EXCHANGE("topic");
+const std::string FANOUT_EXCHANGE("fanout");
+const std::string DIRECT_EXCHANGE("direct");
+const std::string HEADERS_EXCHANGE("headers");
+const std::string XML_EXCHANGE("xml");
+const std::string WILDCARD_ANY("*");
}
-class QueueSource : public MessageSource
+//some amqp 0-10 specific options
+namespace xamqp{
+const std::string AUTO_DELETE("x-amqp0-10-auto-delete");
+const std::string EXCHANGE_TYPE("x-amqp0-10-exchange-type");
+const std::string EXCLUSIVE("x-amqp0-10-exclusive");
+const std::string ALTERNATE_EXCHANGE("x-amqp0-10-alternate-exchange");
+const std::string ARGUMENTS("x-amqp0-10-arguments");
+const std::string QUEUE_ARGUMENTS("x-amqp0-10-queue-arguments");
+const std::string SUBSCRIBE_ARGUMENTS("x-amqp0-10-queue-arguments");
+}
+
+class Node
+{
+ protected:
+ enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+ Node(const Address& address);
+
+ const std::string name;
+ Variant createPolicy;
+ Variant assertPolicy;
+ Variant deletePolicy;
+
+ static bool enabled(const Variant& policy, CheckMode mode);
+ static bool createEnabled(const Address& address, CheckMode mode);
+ static void convert(const Variant& option, FieldTable& arguments);
+ static std::vector<std::string> RECEIVER_MODES;
+ static std::vector<std::string> SENDER_MODES;
+};
+
+class Queue : protected Node
{
public:
- QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED,
- bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ Queue(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ private:
+ bool durable;
+ bool autoDelete;
+ bool exclusive;
+ std::string alternateExchange;
+ FieldTable arguments;
+
+ void configure(const Address&);
+};
+
+class Exchange : protected Node
+{
+ public:
+ Exchange(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ const std::string& getDesiredExchangeType() { return type; }
+
+ private:
+ std::string type;
+ bool durable;
+ bool autoDelete;
+ std::string alternateExchange;
+ FieldTable arguments;
+
+ void configure(const Address&);
+};
+
+class QueueSource : public Queue, public MessageSource
+{
+ public:
+ QueueSource(const Address& address);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
- const std::string name;
const AcceptMode acceptMode;
const AcquireMode acquireMode;
const bool exclusive;
- const FieldTable options;
+ FieldTable options;
};
-class Subscription : public MessageSource
+class Subscription : public Exchange, public MessageSource
{
public:
- enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE};
-
- Subscription(const std::string& name, SubscriptionMode mode = SIMPLE,
- const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE);
- void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ Subscription(const Address&, const std::string& exchangeType);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
-
- static SubscriptionMode getMode(const std::string& mode);
private:
struct Binding
{
@@ -107,155 +197,138 @@ class Subscription : public MessageSource
typedef std::vector<Binding> Bindings;
- const std::string name;
- const bool autoDelete;
+ const std::string queue;
+ const bool reliable;
const bool durable;
- const FieldTable queueOptions;
- const FieldTable subscriptionOptions;
+ FieldTable queueOptions;
+ FieldTable subscriptionOptions;
Bindings bindings;
- std::string queue;
+
+ void bindSpecial(const std::string& exchangeType);
+ void bind(const Variant& filter);
+ void bind(const Variant::Map& filter);
+ void bind(const Variant::List& filter);
+ void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ static std::string getSubscriptionName(const std::string& base, const Variant& name);
};
-class Exchange : public MessageSink
+class ExchangeSink : public Exchange, public MessageSink
{
public:
- Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING,
- bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false,
- const FieldTable& options = EMPTY_FIELD_TABLE);
+ ExchangeSink(const Address& name);
void declare(qpid::client::AsyncSession& session, const std::string& name);
void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
- const std::string name;
const std::string defaultSubject;
- const bool passive;
- const std::string type;
- const bool durable;
- const FieldTable options;
};
-class QueueSink : public MessageSink
+class QueueSink : public Queue, public MessageSink
{
public:
- QueueSink(const std::string& name, bool passive=true, bool exclusive=false,
- bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ QueueSink(const Address& name);
void declare(qpid::client::AsyncSession& session, const std::string& name);
void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
- const std::string name;
- const bool passive;
- const bool exclusive;
- const bool autoDelete;
- const bool durable;
- const FieldTable options;
};
bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address);
-const Variant& getOption(const std::string& key, const Variant::Map& options)
+bool in(const Variant& value, const std::vector<std::string>& choices)
{
- Variant::Map::const_iterator i = options.find(key);
- if (i == options.end()) return EMPTY_VARIANT;
- else return i->second;
+ if (!value.isVoid()) {
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value.asString() == *i) return true;
+ }
+ }
+ return false;
+}
+
+bool getReceiverPolicy(const Address& address, const std::string& key)
+{
+ return in(address.getOption(key), list_of<std::string>(ALWAYS)(RECEIVER));
+}
+
+bool getSenderPolicy(const Address& address, const std::string& key)
+{
+ return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
+}
+
+bool is_unreliable(const Address& address)
+{
+ return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+}
+
+bool is_reliable(const Address& address)
+{
+ return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
- const Address& address,
- const Filter* filter,
- const Variant::Map& options)
+ const Address& address)
{
//TODO: handle case where there exists a queue and an exchange of
//the same name (hence an unqualified address is ambiguous)
//TODO: make sure specified address type gives sane error message
- //if it does npt match the configuration on server
+ //if it does not match the configuration on server
+
+ /**
+ if (Node::createEnabled(address, FOR_RECEIVER)) {
+ } else {
+ }
+ **/
if (isQueue(session, address)) {
- //TODO: support auto-created queue as source, if requested by specific option
-
- AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
- AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED;
- bool exclusive = getOption(EXCLUSIVE, options).asBool();
- FieldTable arguments;
- //TODO: extract subscribe arguments from options (e.g. either
- //filter out already processed keys and send the rest, or have
- //a nested map)
-
- std::auto_ptr<MessageSource> source =
- std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments));
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "resolved source address as queue: " << address);
return source;
} else {
- //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important)
- std::auto_ptr<Subscription> bindings =
- std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(),
- Subscription::getMode(getOption(MODE, options).asString())));
-
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value);
- if (result.getNotFound()) {
- throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
- } else if (result.getType() == "topic") {
- if (filter) {
- if (filter->type != Filter::WILDCARD) {
- throw qpid::framing::NotImplementedException(
- QPID_MSG("Filters of type " << filter->type << " not supported by address " << address));
-
- }
- for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) {
- bindings->add(address.value, *i, qpid::framing::FieldTable());
- }
- } else {
- //default is to receive all messages
- bindings->add(address.value, "*", qpid::framing::FieldTable());
- }
- } else if (result.getType() == "fanout") {
- if (filter) {
- throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address));
- }
- bindings->add(address.value, address.value, qpid::framing::FieldTable());
- } else if (result.getType() == "direct") {
- //TODO: ????
- } else {
- //TODO: xml and headers exchanges
- throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address));
- }
- std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release());
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
+ std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
+ QPID_LOG(debug, "resolved source address as topic: " << address);
return source;
}
}
std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
- const qpid::messaging::Address& address,
- const qpid::messaging::Variant::Map& /*options*/)
+ const qpid::messaging::Address& address)
{
std::auto_ptr<MessageSink> sink;
if (isQueue(session, address)) {
- //TODO: support for auto-created queues as sink
- sink = std::auto_ptr<MessageSink>(new QueueSink(address.value));
+ sink = std::auto_ptr<MessageSink>(new QueueSink(address));
} else {
- std::string subject;
- if (isTopic(session, address, subject)) {
- //TODO: support for auto-created exchanges as sink
- sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject));
+ if (isTopic(session, address)) {
+ sink = std::auto_ptr<MessageSink>(new ExchangeSink(address));
} else {
- if (address.type.empty()) {
- throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+ if (address.getType().empty()) {
+ throw InvalidAddress(QPID_MSG("Address not known: " << address));
} else {
- throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type));
+ throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType()));
}
}
}
return sink;
}
-QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) :
- name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {}
+QueueSource::QueueSource(const Address& address) :
+ Queue(address),
+ acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+ acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
+ exclusive(address.getOption(EXCLUSIVE).asBool())
+{
+ //extract subscription arguments from address options
+ convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
+}
void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
@@ -267,11 +340,48 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri
void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
session.messageCancel(destination);
+ checkDelete(session, FOR_RECEIVER);
}
-Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions)
- : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE),
- queueOptions(qOptions), subscriptionOptions(sOptions) {}
+std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+{
+ if (name.isVoid()) {
+ return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
+ } else {
+ return (boost::format("%1%_%2%") % base % name.asString()).str();
+ }
+}
+
+Subscription::Subscription(const Address& address, const std::string& exchangeType)
+ : Exchange(address),
+ queue(getSubscriptionName(name, address.getOption(NAME))),
+ reliable(is_reliable(address)),
+ durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
+{
+ if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
+ convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions);
+ convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions);
+
+ const Variant& filter = address.getOption(FILTER);
+ if (!filter.isVoid()) {
+ //TODO: if both subject _and_ filter are specified,
+ //combine in some way; for now we just ignore the
+ //subject in that case.
+ bind(filter);
+ } else if (address.hasSubject()) {
+ //Note: This will not work for headers- or xml- exchange;
+ //fanout exchange will do no filtering.
+ //TODO: for headers- or xml- exchange can construct a match
+ //for the subject in the application-headers
+ bind(address.getSubject());
+ } else {
+ //Neither a subject nor a filter has been defined, treat this
+ //as wanting to match all messages (Note: direct exchange is
+ //currently unable to support this case).
+ if (!exchangeType.empty()) bindSpecial(exchangeType);
+ else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+ }
+}
void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
{
@@ -280,18 +390,19 @@ void Subscription::add(const std::string& exchange, const std::string& key, cons
void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
- if (name.empty()) {
- //TODO: use same scheme as JMS client for subscription queue name generation?
- queue = session.getId().getName() + destination;
- } else {
- queue = name;
- }
+ //create exchange if required and specified by policy:
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
+
+ //create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=true,
- arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions);
+ arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
+ //bind subscription queue to exchange:
for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
}
- AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+ //subscribe to subscription queue:
+ AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
session.messageSubscribe(arg::queue=queue, arg::destination=destination,
arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
@@ -300,36 +411,23 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string
{
session.messageCancel(destination);
session.queueDelete(arg::queue=queue);
+ checkDelete(session, FOR_RECEIVER);
}
Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
exchange(e), key(k), options(o) {}
-Subscription::SubscriptionMode Subscription::getMode(const std::string& s)
-{
- if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE;
- else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE;
- else if (s == DURABLE_SUBSCRIPTION) return DURABLE;
- else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s));
-}
-
void convert(qpid::messaging::Message& from, qpid::client::Message& to);
-Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject,
- bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) :
- name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {}
+ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {}
-void Exchange::declare(qpid::client::AsyncSession& session, const std::string&)
+void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
- //TODO: should this really by synchronous? want to get error if not valid...
- if (passive) {
- sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } else {
- sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options);
- }
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
}
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
@@ -337,22 +435,17 @@ void Exchange::send(qpid::client::AsyncSession& session, const std::string&, Out
m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
}
-void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
+void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkDelete(session, FOR_SENDER);
+}
-QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive,
- bool _autoDelete, bool _durable, const FieldTable& _options) :
- name(_name), passive(_passive), exclusive(_exclusive),
- autoDelete(_autoDelete), durable(_durable), options(_options) {}
+QueueSink::QueueSink(const Address& address) : Queue(address) {}
void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
- //TODO: should this really by synchronous?
- if (passive) {
- sync(session).queueDeclare(arg::queue=name, arg::passive=true);
- } else {
- sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable,
- arg::autoDelete=autoDelete, arg::arguments=options);
- }
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
@@ -360,9 +453,10 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou
m.status = session.messageTransfer(arg::content=m.message);
}
-void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
-
-void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
+void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkDelete(session, FOR_SENDER);
+}
void convert(qpid::messaging::Message& from, qpid::client::Message& to)
{
@@ -372,7 +466,7 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to)
//TODO: set other delivery properties
to.getMessageProperties().setContentType(from.getContentType());
const Address& address = from.getReplyTo();
- if (!address.value.empty()) {
+ if (!address.getName().empty()) {
to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
@@ -381,72 +475,292 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to)
Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
{
- if (rt.getExchange().empty()) {
- if (rt.getRoutingKey().empty()) {
- return Address();//empty address
- } else {
- return Address(rt.getRoutingKey(), QUEUE_ADDRESS);
- }
+ Address address;
+ if (rt.getExchange().empty()) {//if default exchange, treat as queue
+ address.setName(rt.getRoutingKey());
+ address.setType(QUEUE_ADDRESS);
} else {
- if (rt.getRoutingKey().empty()) {
- return Address(rt.getExchange(), TOPIC_ADDRESS);
- } else {
- return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT);
- }
- }
+ address.setName(rt.getExchange());
+ address.setSubject(rt.getRoutingKey());
+ address.setType(TOPIC_ADDRESS);
+ }
+ return address;
}
qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
{
- if (address.type == QUEUE_ADDRESS || address.type.empty()) {
- return ReplyTo(EMPTY_STRING, address.value);
- } else if (address.type == TOPIC_ADDRESS) {
- return ReplyTo(address.value, EMPTY_STRING);
- } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
- //need to split the value
- string::size_type i = address.value.find(DIVIDER);
- if (i != string::npos) {
- std::string exchange = address.value.substr(0, i);
- std::string routingKey;
- if (i+1 < address.value.size()) {
- routingKey = address.value.substr(i+1);
- }
- return ReplyTo(exchange, routingKey);
- } else {
- return ReplyTo(address.value, EMPTY_STRING);
- }
+ if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+ return ReplyTo(EMPTY_STRING, address.getName());
+ } else if (address.getType() == TOPIC_ADDRESS) {
+ return ReplyTo(address.getName(), address.getSubject());
} else {
- QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type);
- //treat as queue
- return ReplyTo(EMPTY_STRING, address.value);
+ QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType());
+ return ReplyTo(EMPTY_STRING, address.getName());//treat as queue
}
}
bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
{
- return address.type == QUEUE_ADDRESS ||
- (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value);
+ return address.getType() == QUEUE_ADDRESS ||
+ (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
}
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject)
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address)
{
- if (address.type.empty()) {
- return !session.exchangeQuery(address.value).getNotFound();
- } else if (address.type == TOPIC_ADDRESS) {
- return true;
- } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
- string::size_type i = address.value.find(DIVIDER);
- if (i != string::npos) {
- std::string exchange = address.value.substr(0, i);
- if (i+1 < address.value.size()) {
- subject = address.value.substr(i+1);
- }
- }
+ if (address.getType().empty()) {
+ return !session.exchangeQuery(address.getName()).getNotFound();
+ } else if (address.getType() == TOPIC_ADDRESS) {
return true;
} else {
return false;
}
}
+void Subscription::bind(const Variant& filter)
+{
+ switch (filter.getType()) {
+ case qpid::messaging::VAR_MAP:
+ bind(filter.asMap());
+ break;
+ case qpid::messaging::VAR_LIST:
+ bind(filter.asList());
+ break;
+ default:
+ add(name, filter.asString());
+ break;
+ }
+}
+
+void Subscription::bind(const Variant::Map& filter)
+{
+ qpid::framing::FieldTable arguments;
+ translate(filter, arguments);
+ add(name, queue, arguments);
+}
+
+void Subscription::bind(const Variant::List& filter)
+{
+ for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
+ bind(*i);
+ }
+}
+
+void Subscription::bindSpecial(const std::string& exchangeType)
+{
+ if (exchangeType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (exchangeType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (exchangeType == HEADERS_EXCHANGE) {
+ //TODO: add special binding for headers exchange to match all messages
+ } else if (exchangeType == XML_EXCHANGE) {
+ //TODO: add special binding for xml exchange to match all messages
+ } else { //E.g. direct
+ throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
+ }
+}
+
+Node::Node(const Address& address) : name(address.getName()),
+ createPolicy(address.getOption(CREATE)),
+ assertPolicy(address.getOption(ASSERT)),
+ deletePolicy(address.getOption(DELETE)) {}
+
+Queue::Queue(const Address& a) : Node(a),
+ durable(false),
+ autoDelete(false),
+ exclusive(false)
+{
+ configure(a);
+}
+
+void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ QPID_LOG(debug, "Auto-creating queue '" << name << "'");
+ try {
+ sync(session).queueDeclare(arg::queue=name,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::exclusive=exclusive,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str());
+ }
+ }
+}
+
+void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(deletePolicy, mode)) {
+ QPID_LOG(debug, "Auto-deleting queue '" << name << "'");
+ sync(session).queueDelete(arg::queue=name);
+ }
+}
+
+void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ QueueQueryResult result = sync(session).queueQuery(name);
+ if (result.getQueue() != name) {
+ throw InvalidAddress((boost::format("Queue not found: %1%") % name).str());
+ } else {
+ if (durable && !result.getDurable()) {
+ throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str());
+ }
+ if (autoDelete && !result.getAutoDelete()) {
+ throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str());
+ }
+ if (exclusive && !result.getExclusive()) {
+ throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str());
+ }
+ if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
+ throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ % name % alternateExchange % result.getAlternateExchange()).str());
+ }
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (*i->second != *v) {
+ throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ }
+ }
+}
+
+void Queue::configure(const Address& address)
+{
+ const Variant& properties = address.getOption(NODE_PROPERTIES);
+ if (!properties.isVoid()) {
+ Variant::Map p = properties.asMap();
+ durable = p[DURABLE];
+ autoDelete = p[xamqp::AUTO_DELETE];
+ exclusive = p[xamqp::EXCLUSIVE];
+ alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+ if (!p[xamqp::ARGUMENTS].isVoid()) {
+ translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+ }
+ }
+}
+
+Exchange::Exchange(const Address& a) : Node(a),
+ durable(false),
+ autoDelete(false)
+{
+ configure(a);
+}
+
+void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str());
+ }
+ }
+}
+
+void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(deletePolicy, mode)) {
+ sync(session).exchangeDelete(arg::exchange=name);
+ }
+}
+
+void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name);
+ if (result.getNotFound()) {
+ throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
+ } else {
+ if (!type.empty() && result.getType() != type) {
+ throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ % name % type % result.getType()).str());
+ }
+ if (durable && !result.getDurable()) {
+ throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
+ }
+ //Note: Can't check auto-delete or alternate-exchange via
+ //exchange-query-result as these are not returned
+ //TODO: could use a passive declare to check alternate-exchange
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (i->second != v) {
+ throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ }
+ }
+}
+
+void Exchange::configure(const Address& address)
+{
+ const Variant& properties = address.getOption(NODE_PROPERTIES);
+ if (!properties.isVoid()) {
+ Variant::Map p = properties.asMap();
+ durable = p[DURABLE];
+ autoDelete = p[xamqp::AUTO_DELETE];
+ type = p[xamqp::EXCHANGE_TYPE].asString();
+ alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+ if (!p[xamqp::ARGUMENTS].isVoid()) {
+ translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+ }
+ }
+}
+
+
+bool Node::enabled(const Variant& policy, CheckMode mode)
+{
+ bool result;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+bool Node::createEnabled(const Address& address, CheckMode mode)
+{
+ const Variant& policy = address.getOption(CREATE);
+ return enabled(policy, mode);
+}
+
+void Node::convert(const Variant& options, FieldTable& arguments)
+{
+ if (!options.isVoid()) {
+ translate(options.asMap(), arguments);
+ }
+}
+std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
+std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
}}} // namespace qpid::client::amqp0_10