summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-05-24 16:49:28 +0000
committerGordon Sim <gsim@apache.org>2013-05-24 16:49:28 +0000
commit02e58b534bb5a768403c2200c990c1ebf9681cd9 (patch)
tree3b3bcfc7ea7fe0b66242dd4b19e0121ee9970af6 /qpid/cpp
parent956c316470d1f8e884b127f251043ebe16893e96 (diff)
downloadqpid-python-02e58b534bb5a768403c2200c990c1ebf9681cd9.tar.gz
QPID-4888: correct handling of link naming
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1486115 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/NodeProperties.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionOptions.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp72
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp16
17 files changed, 144 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index be98c048b6..d706e6e49e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -179,6 +179,7 @@ void Connection::process()
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
QPID_LOG_CAT(debug, model, id << " connection opened");
pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ setContainerId(pn_connection_remote_container(connection));
pn_connection_open(connection);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
index 1574a5163f..ec586ea28f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/EventClientConnect.h"
#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h"
@@ -77,6 +78,20 @@ void ManagedConnection::setSaslSsf(int ssf)
}
}
+void ManagedConnection::setContainerId(const std::string& container)
+{
+ containerid = container;
+ if (connection) {
+ qpid::types::Variant::Map props;
+ props["container-id"] = containerid;
+ connection->set_remoteProperties(props);
+ }
+}
+const std::string& ManagedConnection::getContainerId() const
+{
+ return containerid;
+}
+
qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const
{
return connection;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
index e2d0376918..f2037d4837 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
@@ -44,6 +44,8 @@ class ManagedConnection : public qpid::management::Manageable, public Connection
std::string getUserid() const;
void setSaslMechanism(const std::string&);
void setSaslSsf(int);
+ void setContainerId(const std::string&);
+ const std::string& getContainerId() const;
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const ConnectionToken* t) const;
void incomingMessageReceived();
@@ -51,6 +53,7 @@ class ManagedConnection : public qpid::management::Manageable, public Connection
private:
const std::string id;
std::string userid;
+ std::string containerid;
qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
qpid::management::ManagementAgent* agent;
};
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
index c9875c457b..d6ba18077c 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
@@ -19,6 +19,7 @@
*
*/
#include "ManagedIncomingLink.h"
+#include "qpid/broker/amqp/ManagedConnection.h"
#include "qpid/broker/amqp/ManagedSession.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
@@ -35,7 +36,7 @@ ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, cons
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, source, target, _name));
+ incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target));
agent->addObject(incoming);
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
index 4c5f7fc1fc..d60fbf4b59 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
@@ -19,6 +19,7 @@
*
*/
#include "ManagedOutgoingLink.h"
+#include "qpid/broker/amqp/ManagedConnection.h"
#include "qpid/broker/amqp/ManagedSession.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
@@ -35,7 +36,7 @@ ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, cons
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, target, _name));
+ outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target));
agent->addObject(outgoing);
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
index 9bef0e842b..25e124d770 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
@@ -84,5 +84,9 @@ void ManagedSession::incomingMessageRejected()
{
}
+ManagedConnection& ManagedSession::getParent()
+{
+ return parent;
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
index 1f56964bb6..5e11d6e2ca 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
@@ -48,6 +48,7 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke
void outgoingMessageSent();
void outgoingMessageAccepted();
void outgoingMessageRejected();
+ ManagedConnection& getParent();
private:
ManagedConnection& parent;
const std::string id;
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
index eea7612cb9..a937c1171e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
@@ -39,12 +39,13 @@ const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
//AMQP 0-10 standard parameters:
const std::string DURABLE("durable");
+const std::string EXCLUSIVE("exclusive");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
const std::string EXCHANGE_TYPE("exchange-type");
}
-NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exchangeType("topic") {}
+NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic") {}
void NodeProperties::read(pn_data_t* data)
{
@@ -60,6 +61,8 @@ void NodeProperties::process(const std::string& key, const qpid::types::Variant&
else if (value == COPY) queue = false;
} else if (key == DURABLE) {
durable = value;
+ } else if (key == EXCLUSIVE) {
+ exclusive = value;
} else if (key == AUTO_DELETE) {
autoDelete = value;
} else if (key == ALTERNATE_EXCHANGE) {
@@ -167,6 +170,10 @@ bool NodeProperties::isDurable() const
{
return durable;
}
+bool NodeProperties::isExclusive() const
+{
+ return exclusive;
+}
std::string NodeProperties::getExchangeType() const
{
return exchangeType;
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
index b81d1d712c..881fc4e30f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
+++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
@@ -54,12 +54,14 @@ class NodeProperties : public qpid::amqp::MapReader
bool isQueue() const;
QueueSettings getQueueSettings();
bool isDurable() const;
+ bool isExclusive() const;
std::string getExchangeType() const;
std::string getAlternateExchange() const;
private:
bool queue;
bool durable;
bool autoDelete;
+ bool exclusive;
std::string exchangeType;
std::string alternateExchange;
qpid::types::Variant::Map properties;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index afa87db1c3..adde339fa5 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -36,7 +36,6 @@
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/amqp/Filter.h"
-#include "qpid/broker/amqp/NodeProperties.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -133,13 +132,12 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
if (!node.queue && !node.exchange) {
if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
//is it a queue or an exchange?
- NodeProperties properties;
- properties.read(pn_terminus_properties(terminus));
- if (properties.isQueue()) {
- node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+ node.properties.read(pn_terminus_properties(terminus));
+ if (node.properties.isQueue()) {
+ node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
} else {
qpid::framing::FieldTable args;
- node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(),
+ node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
args, connection.getUserid(), connection.getId()).first;
}
} else {
@@ -164,6 +162,11 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
node.exchange.reset();
}
+
+ if (node.properties.isExclusive() && node.queue && node.queue->setExclusiveOwner(this)) {
+ exclusiveQueues.insert(node.queue);
+ }
+
return node;
}
@@ -283,14 +286,17 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
}
outgoing[link] = q;
} else if (node.exchange) {
- QueueSettings settings(false, true);
+ bool durable = pn_terminus_get_durability(source);
+ QueueSettings settings(durable, !durable);
if (filter.hasSelectorFilter()) {
settings.filter = filter.getSelectorFilter();
QPID_LOG(debug, "Selector specified for outgoing link from exchange " << node.exchange->getName() << ": " << settings.filter);
}
//TODO: populate settings from source details when available from engine
+ std::stringstream queueName;//combination of container id and link name is unique
+ queueName << connection.getContainerId() << "_" << pn_link_name(link);
boost::shared_ptr<qpid::broker::Queue> queue
- = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+ = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
queue->setExclusiveOwner(this);
if (filter.hasSubjectFilter()) {
filter.bind(node.exchange, queue);
@@ -430,6 +436,7 @@ bool Session::dispatch()
void Session::close()
{
+ exclusiveQueues.clear();
for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
i->second->detached();
}
@@ -439,6 +446,9 @@ void Session::close()
outgoing.clear();
incoming.clear();
QPID_LOG(debug, "Session closed, all links detached.");
+ for (std::set< boost::shared_ptr<Queue> >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) {
+ (*i)->releaseExclusiveOwnership();
+ }
qpid::sys::Mutex::ScopedLock l(lock);
deleted = true;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index fa2b029986..19922f3ee1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -24,8 +24,10 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/broker/amqp/ManagedSession.h"
+#include "qpid/broker/amqp/NodeProperties.h"
#include <deque>
#include <map>
+#include <set>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -85,11 +87,13 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
std::deque<pn_delivery_t*> completed;
bool deleted;
qpid::sys::Mutex lock;
+ std::set< boost::shared_ptr<Queue> > exclusiveQueues;
struct ResolvedNode
{
boost::shared_ptr<qpid::broker::Exchange> exchange;
boost::shared_ptr<qpid::broker::Queue> queue;
boost::shared_ptr<Relay> relay;
+ NodeProperties properties;
};
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
index ecd5ba9693..746666a79c 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
+++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
@@ -113,6 +113,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant&
sslCertName = value.asString();
} else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
reconnectOnLimitExceeded = value;
+ } else if (name == "container-id" || name == "container_id") {
+ identifier = value.asString();
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
index 6786fd4a64..7a701fce65 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
+++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
@@ -42,6 +42,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings
double maxReconnectInterval;
int32_t retries;
bool reconnectOnLimitExceeded;
+ std::string identifier;
ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
void set(const std::string& name, const qpid::types::Variant& value);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 66d9bc81da..258b4b84bd 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -24,7 +24,9 @@
#include "qpid/log/Statement.h"
#include <vector>
#include <set>
+#include <sstream>
#include <boost/assign.hpp>
+#include <boost/format.hpp>
extern "C" {
#include <proton/engine.h>
}
@@ -59,6 +61,9 @@ const std::string TYPE("type");
const std::string TOPIC("topic");
const std::string QUEUE("queue");
const std::string DURABLE("durable");
+const std::string NAME("name");
+const std::string RELIABILITY("reliability");
+const std::string SELECTOR("selector");
//distribution modes:
const std::string MOVE("move");
@@ -77,6 +82,17 @@ const std::string ARGUMENTS("arguments");
const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
+class Verifier
+{
+ public:
+ Verifier();
+ void verify(const Address& address) const;
+ private:
+ Variant::Map defined;
+ void verify(const Variant::Map& allowed, const Variant::Map& actual) const;
+};
+const Verifier verifier;
+
pn_bytes_t convert(const std::string& s)
{
pn_bytes_t result;
@@ -177,6 +193,7 @@ AddressHelper::AddressHelper(const Address& address) :
durableLink(false),
browse(false)
{
+ verifier.verify(address);
bind(address, CREATE, createPolicy);
bind(address, DELETE, deletePolicy);
bind(address, ASSERT, assertPolicy);
@@ -191,6 +208,7 @@ AddressHelper::AddressHelper(const Address& address) :
if (bind(address, MODE, mode)) {
if (mode == BROWSE) {
browse = true;
+ throw qpid::messaging::AddressError("Browse mode not yet supported over AMQP 1.0.");
} else if (mode != CONSUME) {
throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'.");
}
@@ -325,6 +343,19 @@ void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
pn_data_put_symbol(data, convert(i->asString()));
}
}
+std::string AddressHelper::getLinkName(const Address& address)
+{
+ AddressHelper helper(address);
+ const qpid::types::Variant::Map& linkProps = helper.getLinkProperties();
+ qpid::types::Variant::Map::const_iterator i = linkProps.find(NAME);
+ if (i != linkProps.end()) {
+ return i->second.asString();
+ } else {
+ std::stringstream name;
+ name << address.getName() << "_" << qpid::types::Uuid(true);
+ return name.str();
+ }
+}
void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
{
@@ -348,4 +379,45 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
}
}
+Verifier::Verifier()
+{
+ defined[CREATE] = true;
+ defined[ASSERT] = true;
+ defined[DELETE] = true;
+ defined[MODE] = true;
+ Variant::Map node;
+ node[TYPE] = true;
+ node[DURABLE] = true;
+ node[PROPERTIES] = true;
+ node[CAPABILITIES] = true;
+ node[X_DECLARE] = true;
+ node[X_BINDINGS] = true;
+ defined[NODE] = node;
+ Variant::Map link;
+ link[NAME] = true;
+ link[DURABLE] = true;
+ link[RELIABILITY] = true;
+ link[X_SUBSCRIBE] = true;
+ link[X_DECLARE] = true;
+ link[X_BINDINGS] = true;
+ link[SELECTOR] = true;
+ defined[LINK] = link;
+}
+void Verifier::verify(const Address& address) const
+{
+ verify(defined, address.getOptions());
+}
+
+void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const
+{
+ for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) {
+ Variant::Map::const_iterator option = allowed.find(i->first);
+ if (option == allowed.end()) {
+ throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str());
+ } else if (option->second.getType() == qpid::types::VAR_MAP) {
+ verify(option->second.asMap(), i->second.asMap());
+ }
+ }
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index da666feb92..d03370b597 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -41,6 +41,7 @@ class AddressHelper
const qpid::types::Variant::Map& getNodeProperties() const;
const qpid::types::Variant::Map& getLinkProperties() const;
+ static std::string getLinkName(const Address& address);
private:
bool isTemporary;
std::string createPolicy;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 5f6fda81c5..c9623f568c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -60,7 +60,10 @@ ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Va
if (pn_transport_bind(engine, connection)) {
//error
}
- pn_connection_set_container(connection, "qpid::messaging");//TODO: take this from a connection option
+ if (identifier.empty()) {
+ identifier = qpid::types::Uuid(true).str();
+ }
+ pn_connection_set_container(connection, identifier.c_str());
bool enableTrace(false);
QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 004ee4e775..64462215f3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -43,12 +43,8 @@ SessionContext::~SessionContext()
boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address)
{
- std::string name = address.getName();
-
- int count = 1;
- for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) {
- name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
- }
+ std::string name = AddressHelper::getLinkName(address);
+ if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection");
boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address));
senders[name] = s;
return s;
@@ -56,12 +52,8 @@ boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messag
boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
{
- std::string name = address.getName();
-
- int count = 1;
- for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) {
- name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
- }
+ std::string name = AddressHelper::getLinkName(address);
+ if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection");
boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
receivers[name] = r;
return r;