diff options
| author | Gordon Sim <gsim@apache.org> | 2013-05-24 16:49:28 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-05-24 16:49:28 +0000 |
| commit | 02e58b534bb5a768403c2200c990c1ebf9681cd9 (patch) | |
| tree | 3b3bcfc7ea7fe0b66242dd4b19e0121ee9970af6 /qpid/cpp | |
| parent | 956c316470d1f8e884b127f251043ebe16893e96 (diff) | |
| download | qpid-python-02e58b534bb5a768403c2200c990c1ebf9681cd9.tar.gz | |
QPID-4888: correct handling of link naming
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1486115 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
17 files changed, 144 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index be98c048b6..d706e6e49e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -179,6 +179,7 @@ void Connection::process() if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { QPID_LOG_CAT(debug, model, id << " connection opened"); pn_connection_set_container(connection, broker.getFederationTag().c_str()); + setContainerId(pn_connection_remote_container(connection)); pn_connection_open(connection); } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp index 1574a5163f..ec586ea28f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" #include "qpid/log/Statement.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/EventClientConnect.h" #include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" @@ -77,6 +78,20 @@ void ManagedConnection::setSaslSsf(int ssf) } } +void ManagedConnection::setContainerId(const std::string& container) +{ + containerid = container; + if (connection) { + qpid::types::Variant::Map props; + props["container-id"] = containerid; + connection->set_remoteProperties(props); + } +} +const std::string& ManagedConnection::getContainerId() const +{ + return containerid; +} + qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const { return connection; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h index e2d0376918..f2037d4837 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -44,6 +44,8 @@ class ManagedConnection : public qpid::management::Manageable, public Connection std::string getUserid() const; void setSaslMechanism(const std::string&); void setSaslSsf(int); + void setContainerId(const std::string&); + const std::string& getContainerId() const; qpid::management::ManagementObject::shared_ptr GetManagementObject() const; bool isLocal(const ConnectionToken* t) const; void incomingMessageReceived(); @@ -51,6 +53,7 @@ class ManagedConnection : public qpid::management::Manageable, public Connection private: const std::string id; std::string userid; + std::string containerid; qmf::org::apache::qpid::broker::Connection::shared_ptr connection; qpid::management::ManagementAgent* agent; }; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp index c9875c457b..d6ba18077c 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp @@ -19,6 +19,7 @@ * */ #include "ManagedIncomingLink.h" +#include "qpid/broker/amqp/ManagedConnection.h" #include "qpid/broker/amqp/ManagedSession.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" @@ -35,7 +36,7 @@ ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, cons { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, source, target, _name)); + incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target)); agent->addObject(incoming); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp index 4c5f7fc1fc..d60fbf4b59 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp @@ -19,6 +19,7 @@ * */ #include "ManagedOutgoingLink.h" +#include "qpid/broker/amqp/ManagedConnection.h" #include "qpid/broker/amqp/ManagedSession.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" @@ -35,7 +36,7 @@ ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, cons { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, target, _name)); + outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target)); agent->addObject(outgoing); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp index 9bef0e842b..25e124d770 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp @@ -84,5 +84,9 @@ void ManagedSession::incomingMessageRejected() { } +ManagedConnection& ManagedSession::getParent() +{ + return parent; +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h index 1f56964bb6..5e11d6e2ca 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h @@ -48,6 +48,7 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke void outgoingMessageSent(); void outgoingMessageAccepted(); void outgoingMessageRejected(); + ManagedConnection& getParent(); private: ManagedConnection& parent; const std::string id; diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp index eea7612cb9..a937c1171e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp @@ -39,12 +39,13 @@ const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); //AMQP 0-10 standard parameters: const std::string DURABLE("durable"); +const std::string EXCLUSIVE("exclusive"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); const std::string EXCHANGE_TYPE("exchange-type"); } -NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exchangeType("topic") {} +NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic") {} void NodeProperties::read(pn_data_t* data) { @@ -60,6 +61,8 @@ void NodeProperties::process(const std::string& key, const qpid::types::Variant& else if (value == COPY) queue = false; } else if (key == DURABLE) { durable = value; + } else if (key == EXCLUSIVE) { + exclusive = value; } else if (key == AUTO_DELETE) { autoDelete = value; } else if (key == ALTERNATE_EXCHANGE) { @@ -167,6 +170,10 @@ bool NodeProperties::isDurable() const { return durable; } +bool NodeProperties::isExclusive() const +{ + return exclusive; +} std::string NodeProperties::getExchangeType() const { return exchangeType; diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h index b81d1d712c..881fc4e30f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h +++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h @@ -54,12 +54,14 @@ class NodeProperties : public qpid::amqp::MapReader bool isQueue() const; QueueSettings getQueueSettings(); bool isDurable() const; + bool isExclusive() const; std::string getExchangeType() const; std::string getAlternateExchange() const; private: bool queue; bool durable; bool autoDelete; + bool exclusive; std::string exchangeType; std::string alternateExchange; qpid::types::Variant::Map properties; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index afa87db1c3..adde339fa5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -36,7 +36,6 @@ #include "qpid/broker/Selector.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/amqp/Filter.h" -#include "qpid/broker/amqp/NodeProperties.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" @@ -133,13 +132,12 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te if (!node.queue && !node.exchange) { if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) { //is it a queue or an exchange? - NodeProperties properties; - properties.read(pn_terminus_properties(terminus)); - if (properties.isQueue()) { - node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first; + node.properties.read(pn_terminus_properties(terminus)); + if (node.properties.isQueue()) { + node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first; } else { qpid::framing::FieldTable args; - node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(), + node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), args, connection.getUserid(), connection.getId()).first; } } else { @@ -164,6 +162,11 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); node.exchange.reset(); } + + if (node.properties.isExclusive() && node.queue && node.queue->setExclusiveOwner(this)) { + exclusiveQueues.insert(node.queue); + } + return node; } @@ -283,14 +286,17 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s } outgoing[link] = q; } else if (node.exchange) { - QueueSettings settings(false, true); + bool durable = pn_terminus_get_durability(source); + QueueSettings settings(durable, !durable); if (filter.hasSelectorFilter()) { settings.filter = filter.getSelectorFilter(); QPID_LOG(debug, "Selector specified for outgoing link from exchange " << node.exchange->getName() << ": " << settings.filter); } //TODO: populate settings from source details when available from engine + std::stringstream queueName;//combination of container id and link name is unique + queueName << connection.getContainerId() << "_" << pn_link_name(link); boost::shared_ptr<qpid::broker::Queue> queue - = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; + = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first; queue->setExclusiveOwner(this); if (filter.hasSubjectFilter()) { filter.bind(node.exchange, queue); @@ -430,6 +436,7 @@ bool Session::dispatch() void Session::close() { + exclusiveQueues.clear(); for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { i->second->detached(); } @@ -439,6 +446,9 @@ void Session::close() outgoing.clear(); incoming.clear(); QPID_LOG(debug, "Session closed, all links detached."); + for (std::set< boost::shared_ptr<Queue> >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) { + (*i)->releaseExclusiveOwnership(); + } qpid::sys::Mutex::ScopedLock l(lock); deleted = true; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index fa2b029986..19922f3ee1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -24,8 +24,10 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/OutputControl.h" #include "qpid/broker/amqp/ManagedSession.h" +#include "qpid/broker/amqp/NodeProperties.h" #include <deque> #include <map> +#include <set> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> @@ -85,11 +87,13 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses std::deque<pn_delivery_t*> completed; bool deleted; qpid::sys::Mutex lock; + std::set< boost::shared_ptr<Queue> > exclusiveQueues; struct ResolvedNode { boost::shared_ptr<qpid::broker::Exchange> exchange; boost::shared_ptr<qpid::broker::Queue> queue; boost::shared_ptr<Relay> relay; + NodeProperties properties; }; ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming); diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp index ecd5ba9693..746666a79c 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp @@ -113,6 +113,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant& sslCertName = value.asString(); } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { reconnectOnLimitExceeded = value; + } else if (name == "container-id" || name == "container_id") { + identifier = value.asString(); } else { throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); } diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h index 6786fd4a64..7a701fce65 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h @@ -42,6 +42,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings double maxReconnectInterval; int32_t retries; bool reconnectOnLimitExceeded; + std::string identifier; ConnectionOptions(const std::map<std::string, qpid::types::Variant>&); void set(const std::string& name, const qpid::types::Variant& value); diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 66d9bc81da..258b4b84bd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -24,7 +24,9 @@ #include "qpid/log/Statement.h" #include <vector> #include <set> +#include <sstream> #include <boost/assign.hpp> +#include <boost/format.hpp> extern "C" { #include <proton/engine.h> } @@ -59,6 +61,9 @@ const std::string TYPE("type"); const std::string TOPIC("topic"); const std::string QUEUE("queue"); const std::string DURABLE("durable"); +const std::string NAME("name"); +const std::string RELIABILITY("reliability"); +const std::string SELECTOR("selector"); //distribution modes: const std::string MOVE("move"); @@ -77,6 +82,17 @@ const std::string ARGUMENTS("arguments"); const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); +class Verifier +{ + public: + Verifier(); + void verify(const Address& address) const; + private: + Variant::Map defined; + void verify(const Variant::Map& allowed, const Variant::Map& actual) const; +}; +const Verifier verifier; + pn_bytes_t convert(const std::string& s) { pn_bytes_t result; @@ -177,6 +193,7 @@ AddressHelper::AddressHelper(const Address& address) : durableLink(false), browse(false) { + verifier.verify(address); bind(address, CREATE, createPolicy); bind(address, DELETE, deletePolicy); bind(address, ASSERT, assertPolicy); @@ -191,6 +208,7 @@ AddressHelper::AddressHelper(const Address& address) : if (bind(address, MODE, mode)) { if (mode == BROWSE) { browse = true; + throw qpid::messaging::AddressError("Browse mode not yet supported over AMQP 1.0."); } else if (mode != CONSUME) { throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'."); } @@ -325,6 +343,19 @@ void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) pn_data_put_symbol(data, convert(i->asString())); } } +std::string AddressHelper::getLinkName(const Address& address) +{ + AddressHelper helper(address); + const qpid::types::Variant::Map& linkProps = helper.getLinkProperties(); + qpid::types::Variant::Map::const_iterator i = linkProps.find(NAME); + if (i != linkProps.end()) { + return i->second.asString(); + } else { + std::stringstream name; + name << address.getName() << "_" << qpid::types::Uuid(true); + return name.str(); + } +} void AddressHelper::setNodeProperties(pn_terminus_t* terminus) { @@ -348,4 +379,45 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus) } } +Verifier::Verifier() +{ + defined[CREATE] = true; + defined[ASSERT] = true; + defined[DELETE] = true; + defined[MODE] = true; + Variant::Map node; + node[TYPE] = true; + node[DURABLE] = true; + node[PROPERTIES] = true; + node[CAPABILITIES] = true; + node[X_DECLARE] = true; + node[X_BINDINGS] = true; + defined[NODE] = node; + Variant::Map link; + link[NAME] = true; + link[DURABLE] = true; + link[RELIABILITY] = true; + link[X_SUBSCRIBE] = true; + link[X_DECLARE] = true; + link[X_BINDINGS] = true; + link[SELECTOR] = true; + defined[LINK] = link; +} +void Verifier::verify(const Address& address) const +{ + verify(defined, address.getOptions()); +} + +void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const +{ + for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) { + Variant::Map::const_iterator option = allowed.find(i->first); + if (option == allowed.end()) { + throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str()); + } else if (option->second.getType() == qpid::types::VAR_MAP) { + verify(option->second.asMap(), i->second.asMap()); + } + } +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index da666feb92..d03370b597 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -41,6 +41,7 @@ class AddressHelper const qpid::types::Variant::Map& getNodeProperties() const; const qpid::types::Variant::Map& getLinkProperties() const; + static std::string getLinkName(const Address& address); private: bool isTemporary; std::string createPolicy; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 5f6fda81c5..c9623f568c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -60,7 +60,10 @@ ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Va if (pn_transport_bind(engine, connection)) { //error } - pn_connection_set_container(connection, "qpid::messaging");//TODO: take this from a connection option + if (identifier.empty()) { + identifier = qpid::types::Uuid(true).str(); + } + pn_connection_set_container(connection, identifier.c_str()); bool enableTrace(false); QPID_LOG_TEST_CAT(trace, protocol, enableTrace); if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 004ee4e775..64462215f3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -43,12 +43,8 @@ SessionContext::~SessionContext() boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address) { - std::string name = address.getName(); - - int count = 1; - for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) { - name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); - } + std::string name = AddressHelper::getLinkName(address); + if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection"); boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address)); senders[name] = s; return s; @@ -56,12 +52,8 @@ boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messag boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) { - std::string name = address.getName(); - - int count = 1; - for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) { - name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); - } + std::string name = AddressHelper::getLinkName(address); + if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection"); boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); receivers[name] = r; return r; |
