summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp60
1 files changed, 45 insertions, 15 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index f1295a3b66..16e5fde075 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -129,6 +129,10 @@ const std::string HEADERS_EXCHANGE("headers");
const std::string XML_EXCHANGE("xml");
const std::string WILDCARD_ANY("#");
+//exchange prefixes:
+const std::string PREFIX_AMQ("amq.");
+const std::string PREFIX_QPID("qpid.");
+
const Verifier verifier;
}
@@ -199,6 +203,7 @@ class Exchange : protected Node
void checkCreate(qpid::client::AsyncSession&, CheckMode);
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ bool isReservedName();
protected:
const std::string specifiedType;
@@ -233,6 +238,8 @@ class Subscription : public Exchange, public MessageSource
const bool reliable;
const bool durable;
const std::string actualType;
+ const bool exclusiveQueue;
+ const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -307,6 +314,7 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
+ bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -338,6 +346,12 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
+bool Opt::asBool(bool defaultValue) const
+{
+ if (value) return value->asBool();
+ else return defaultValue;
+}
+
std::string Opt::str() const
{
if (value) return value->asString();
@@ -481,7 +495,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const std
if (name.empty()) {
return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
} else {
- return (boost::format("%1%_%2%") % base % name).str();
+ return name;
}
}
@@ -490,7 +504,9 @@ Subscription::Subscription(const Address& address, const std::string& type)
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
+ exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
+ exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -550,7 +566,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=true,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -559,15 +575,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- session.queueDelete(arg::queue=queue);
+ if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
checkDelete(session, FOR_RECEIVER);
}
@@ -761,18 +777,32 @@ Exchange::Exchange(const Address& a) : Node(a),
linkBindings.setDefaultExchange(name);
}
+bool Exchange::isReservedName()
+{
+ return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos;
+}
+
void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
try {
- std::string type = specifiedType;
- if (type.empty()) type = TOPIC_EXCHANGE;
- session.exchangeDeclare(arg::exchange=name,
- arg::type=type,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
+ if (isReservedName()) {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } catch (const qpid::framing::NotFoundException& /*e*/) {
+ throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str());
+ }
+
+ } else {
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
+ session.exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ }
nodeBindings.bind(session);
session.sync();
} catch (const qpid::framing::NotAllowedException& e) {
@@ -822,7 +852,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
- } else if (i->second != v) {
+ } else if (*i->second != *v) {
throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}