summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-05-21 21:40:49 +0000
committerTed Ross <tross@apache.org>2008-05-21 21:40:49 +0000
commit35d9dc572a918015c038245725b0f9894b13132a (patch)
treed9efecaeab11e12f0b2f2d87ff7f202383eaa6a0 /cpp/src/qpid/broker
parent28404c0026b5bed8ad4ad37d52cd4d3aab5c70bc (diff)
downloadqpid-python-35d9dc572a918015c038245725b0f9894b13132a.tar.gz
QPID-1087
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658886 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp12
-rw-r--r--cpp/src/qpid/broker/Broker.h9
-rw-r--r--cpp/src/qpid/broker/Connection.cpp144
-rw-r--r--cpp/src/qpid/broker/Connection.h36
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.h3
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp12
-rw-r--r--cpp/src/qpid/broker/Link.cpp52
-rw-r--r--cpp/src/qpid/broker/Link.h21
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp55
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h7
11 files changed, 218 insertions, 137 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 4f7686aac4..2992ea45cf 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -361,18 +361,20 @@ void Broker::accept() {
// TODO: How to chose the protocolFactory to use for the connection
void Broker::connect(
const std::string& host, uint16_t port, bool /*useSsl*/,
- sys::ConnectionCodec::Factory* f,
- sys::ProtocolAccess* access)
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* f)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
+ getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed);
}
void Broker::connect(
- const Url& url, sys::ConnectionCodec::Factory* f)
+ const Url& url,
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* f)
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
+ connect(addr.host, addr.port, false, failed, f);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 7092a86181..531817db83 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -44,7 +44,6 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/sys/ProtocolAccess.h"
#include <vector>
@@ -135,10 +134,12 @@ class Broker : public sys::Runnable, public Plugin::Target,
/** Create a connection to another broker. */
void connect(const std::string& host, uint16_t port, bool useSsl,
- sys::ConnectionCodec::Factory* =0,
- sys::ProtocolAccess* =0);
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* =0);
/** Create a connection to another broker. */
- void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
+ void connect(const Url& url,
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* =0);
// TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
// For the present just return the first ProtocolFactory registered.
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 463193a346..ea3d3547f5 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -47,44 +47,26 @@ using qpid::management::Args;
namespace qpid {
namespace broker {
-class Connection::MgmtClient : public Connection::MgmtWrapper
-{
- management::Client::shared_ptr mgmtClient;
-
-public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
- const std::string& mgmtId, bool incoming);
- ~MgmtClient();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
-};
-
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
- adapter(*this, isLink),
+ adapter(*this, isLink_),
+ isLink(isLink_),
mgmtClosing(false),
- mgmtId(mgmtId_)
-{
- initMgmt();
-}
-
-void Connection::initMgmt(bool asLink)
+ mgmtId(mgmtId_),
+ links(broker_.getLinks())
{
Manageable* parent = broker.GetVhostObject ();
+ if (isLink)
+ links.notifyConnection (mgmtId, this);
+
if (parent != 0)
{
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0)
- {
- if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
- } else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
- }
- }
+ mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink));
+ agent->addObject (mgmtObject);
}
}
@@ -95,19 +77,65 @@ void Connection::requestIOProcessing (boost::function0<void> callback)
}
-Connection::~Connection () {}
+Connection::~Connection ()
+{
+ if (mgmtObject.get() != 0)
+ mgmtObject->resourceDestroy();
+ if (isLink)
+ links.notifyClosed (mgmtId);
+}
void Connection::received(framing::AMQFrame& frame){
- if (mgmtClosing)
- close (403, "Closed by Management Request", 0, 0);
-
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
}
-
- if (mgmtWrapper.get()) mgmtWrapper->received(frame);
+
+ if (isLink)
+ recordFromServer(frame);
+ else
+ recordFromClient(frame);
+}
+
+void Connection::recordFromServer (framing::AMQFrame& frame)
+{
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesToClient ();
+ mgmtObject->inc_bytesToClient (frame.size ());
+ }
+}
+
+void Connection::recordFromClient (framing::AMQFrame& frame)
+{
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesFromClient ();
+ mgmtObject->inc_bytesFromClient (frame.size ());
+ }
+}
+
+string Connection::getAuthMechanism()
+{
+ if (!isLink)
+ return string("ANONYMOUS");
+
+ return links.getAuthMechanism(mgmtId);
+}
+
+string Connection::getAuthCredentials()
+{
+ if (!isLink)
+ return string();
+
+ return links.getAuthCredentials(mgmtId);
+}
+
+void Connection::notifyConnectionForced(const string& text)
+{
+ if (isLink)
+ links.notifyConnectionForced(mgmtId, text);
}
void Connection::close(
@@ -125,7 +153,7 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
- ptr_map_ptr(channels.begin())->handleDetach();
+ ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -147,10 +175,12 @@ bool Connection::doOutput()
if (ioCallback)
ioCallback(); // Lend the IO thread for management processing
ioCallback = 0;
- if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
- //then do other output as needed:
- return outputTasks.doOutput();
+ if (mgmtClosing)
+ close (403, "Closed by Management Request", 0, 0);
+ else
+ //then do other output as needed:
+ return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.what(), 0, 0);
}catch(std::exception& e){
@@ -174,7 +204,7 @@ SessionHandler& Connection::getChannel(ChannelId id) {
ManagementObject::shared_ptr Connection::GetManagementObject (void) const
{
- return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
+ return dynamic_pointer_cast<ManagementObject>(mgmtObject);
}
Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
@@ -187,7 +217,7 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
case management::Client::METHOD_CLOSE :
mgmtClosing = true;
- if (mgmtWrapper.get()) mgmtWrapper->closing();
+ if (mgmtObject.get()) mgmtObject->set_closing(1);
out->activateOutput();
status = Manageable::STATUS_OK;
break;
@@ -196,39 +226,5 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
return status;
}
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
- ManagementAgent::shared_ptr agent,
- const std::string& mgmtId, bool incoming)
-{
- mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId, incoming));
- agent->addObject (mgmtClient);
-}
-
-Connection::MgmtClient::~MgmtClient()
-{
- if (mgmtClient.get () != 0)
- mgmtClient->resourceDestroy ();
-}
-
-void Connection::MgmtClient::received(framing::AMQFrame& frame)
-{
- if (mgmtClient.get () != 0)
- {
- mgmtClient->inc_framesFromClient ();
- mgmtClient->inc_bytesFromClient (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtClient);
-}
-
-void Connection::MgmtClient::closing()
-{
- if (mgmtClient) mgmtClient->set_closing (1);
-}
-
}}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index dff1e0653b..e6e3d4d15e 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -43,13 +43,14 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Client.h"
-#include "qpid/management/Link.h"
#include <boost/ptr_container/ptr_map.hpp>
namespace qpid {
namespace broker {
+class LinkRegistry;
+
class Connection : public sys::ConnectionInputHandler,
public ConnectionState
{
@@ -62,7 +63,10 @@ class Connection : public sys::ConnectionInputHandler,
SessionHandler& getChannel(framing::ChannelId channel);
/** Close the connection */
- void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+ void close(framing::ReplyCode code = 403,
+ const string& text = string(),
+ framing::ClassId classId = 0,
+ framing::MethodId methodId = 0);
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
@@ -78,38 +82,26 @@ class Connection : public sys::ConnectionInputHandler,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- void initMgmt(bool asLink = false);
void requestIOProcessing (boost::function0<void>);
+ void recordFromServer (framing::AMQFrame& frame);
+ void recordFromClient (framing::AMQFrame& frame);
+ std::string getAuthMechanism();
+ std::string getAuthCredentials();
+ void notifyConnectionForced(const std::string& text);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- /**
- * Connection may appear, for the purposes of management, as a
- * normal client initiated connection or as an agent initiated
- * inter-broker link. This wrapper abstracts the common interface
- * for both.
- */
- class MgmtWrapper
- {
- public:
- virtual ~MgmtWrapper(){}
- virtual void received(framing::AMQFrame& frame) = 0;
- virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
- virtual void closing() = 0;
- virtual void processPending(){}
- virtual void process(Connection&, const management::Args&){}
- };
- class MgmtClient;
-
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
- std::auto_ptr<MgmtWrapper> mgmtWrapper;
+ bool isLink;
bool mgmtClosing;
const std::string mgmtId;
boost::function0<void> ioCallback;
+ management::Client::shared_ptr mgmtObject;
+ LinkRegistry& links;
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index cd015ce4f5..5de5a0230a 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std:
}
sys::ConnectionCodec*
-ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) {
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
// used to create connections from one broker to another
- return new amqp_0_10::Connection(out, broker, id, true, a);
+ return new amqp_0_10::Connection(out, broker, id, true);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h
index bf55ab3b88..5797495054 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/cpp/src/qpid/broker/ConnectionFactory.h
@@ -24,7 +24,6 @@
#include "qpid/sys/ConnectionCodec.h"
namespace qpid {
-namespace sys { class ProtocolAccess; }
namespace broker {
class Broker;
@@ -38,7 +37,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory {
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0);
+ create(sys::OutputControl&, const std::string& id);
private:
Broker& broker;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 162664fb88..77a4d1a3de 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -26,6 +26,7 @@
#include "Connection.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/framing/constants.h"
#include "qpid/log/Statement.h"
using namespace qpid;
@@ -123,6 +124,10 @@ void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyTe
if (replyCode != 200) {
QPID_LOG(warning, "Client closed connection with " << replyCode << ": " << replyText);
}
+
+ if (replyCode == framing::connection::CONNECTION_FORCED)
+ connection.notifyConnectionForced(replyText);
+
client.closeOk();
connection.getOutput().close();
}
@@ -136,9 +141,10 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/,
const framing::Array& /*mechanisms*/,
const framing::Array& /*locales*/)
{
- string response;
- server.startOk(FieldTable(), ANONYMOUS, response, en_US);
- connection.initMgmt(true);
+ string mechanism = connection.getAuthMechanism();
+ string response = connection.getAuthCredentials();
+
+ server.startOk(FieldTable(), mechanism, response, en_US);
}
void ConnectionHandler::Handler::secure(const string& /*challenge*/)
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index cd032495e2..6bcfcf77a3 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -51,13 +51,11 @@ Link::Link(LinkRegistry* _links,
: links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), broker(_broker), state(0),
- access(boost::bind(&Link::established, this),
- boost::bind(&Link::closed, this, _1, _2),
- boost::bind(&Link::setConnection, this, _1)),
visitCount(0),
currentInterval(1),
closing(false),
- channelCounter(1)
+ channelCounter(1),
+ connection(0)
{
if (parent != 0)
{
@@ -75,8 +73,9 @@ Link::Link(LinkRegistry* _links,
Link::~Link ()
{
- if (state == STATE_OPERATIONAL)
- access.close();
+ if (state == STATE_OPERATIONAL && connection != 0)
+ connection->close();
+
if (mgmtObject.get () != 0)
mgmtObject->resourceDestroy ();
}
@@ -95,13 +94,16 @@ void Link::setStateLH (int newState)
case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+ case STATE_FAILED : mgmtObject->set_state("Failed"); break;
+ case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
}
}
void Link::startConnectionLH ()
{
try {
- broker->connect (host, port, useSsl, 0, &access);
+ broker->connect (host, port, useSsl,
+ boost::bind (&Link::closed, this, _1, _2));
setStateLH(STATE_CONNECTING);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
@@ -125,16 +127,21 @@ void Link::closed (int, std::string text)
{
Mutex::ScopedLock mutex(lock);
+ connection = 0;
+
if (state == STATE_OPERATIONAL)
QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
- connection.reset();
for (Bridges::iterator i = active.begin(); i != active.end(); i++)
created.push_back(*i);
active.clear();
- setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (text);
+ if (state != STATE_FAILED)
+ {
+ setStateLH(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ }
+
if (closing)
destroy();
}
@@ -145,7 +152,10 @@ void Link::destroy ()
Bridges toDelete;
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- connection.reset();
+ if (connection)
+ connection->close(403, "closed by management");
+
+ setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
// corruption of the iterator caused by bridge deletion.
@@ -168,10 +178,7 @@ void Link::destroy ()
void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
-
created.push_back (bridge);
- if (state == STATE_OPERATIONAL && connection.get() != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::cancel(Bridge::shared_ptr bridge)
@@ -197,6 +204,9 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
+ if (state != STATE_OPERATIONAL)
+ return;
+
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -207,12 +217,10 @@ void Link::ioThreadProcessing()
}
}
-void Link::setConnection(Connection::shared_ptr c)
+void Link::setConnection(Connection* c)
{
Mutex::ScopedLock mutex(lock);
-
connection = c;
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::maintenanceVisit ()
@@ -231,6 +239,8 @@ void Link::maintenanceVisit ()
startConnectionLH();
}
}
+ else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
uint Link::nextChannel()
@@ -240,6 +250,14 @@ uint Link::nextChannel()
return channelCounter++;
}
+void Link::notifyConnectionForced(const string text)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ setStateLH(STATE_FAILED);
+ mgmtObject->set_lastError(text);
+}
+
void Link::setPersistenceId(uint64_t id) const
{
if (mgmtObject != 0 && persistenceId == 0)
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index c4eca86c19..de757d112e 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -27,7 +27,6 @@
#include "PersistableConfig.h"
#include "Bridge.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/ProtocolAccess.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Link.h"
@@ -57,7 +56,6 @@ namespace qpid {
management::Link::shared_ptr mgmtObject;
Broker* broker;
int state;
- sys::ProtocolAccess access;
uint32_t visitCount;
uint32_t currentInterval;
bool closing;
@@ -66,21 +64,20 @@ namespace qpid {
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
uint channelCounter;
- boost::shared_ptr<Connection> connection;
+ Connection* connection;
static const int STATE_WAITING = 1;
static const int STATE_CONNECTING = 2;
static const int STATE_OPERATIONAL = 3;
+ static const int STATE_FAILED = 4;
+ static const int STATE_CLOSED = 5;
- static const uint32_t MAX_INTERVAL = 16;
+ static const uint32_t MAX_INTERVAL = 32;
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
- void established(); // Called when connection is created
- void closed(int, std::string); // Called when connection goes away
void destroy(); // Called when mgmt deletes this link
void ioThreadProcessing(); // Called on connection's IO thread by request
- void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
public:
typedef boost::shared_ptr<Link> shared_ptr;
@@ -106,6 +103,16 @@ namespace qpid {
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
+ void established(); // Called when connection is created
+ void closed(int, std::string); // Called when connection goes away
+ void setConnection(Connection*); // Set pointer to the AMQP Connection
+
+ string getAuthMechanism() { return authMechanism; }
+ string getUsername() { return username; }
+ string getPassword() { return password; }
+
+ void notifyConnectionForced(const std::string text);
+
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index be3c67077e..455cc8452e 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -27,7 +27,7 @@ using std::pair;
using std::stringstream;
using boost::intrusive_ptr;
-#define LINK_MAINT_INTERVAL 5
+#define LINK_MAINT_INTERVAL 2
LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
{
@@ -185,3 +185,56 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
+void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ {
+ l->second->established();
+ l->second->setConnection(c);
+ }
+}
+
+void LinkRegistry::notifyClosed(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ l->second->closed(0, "Closed by peer");
+}
+
+void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ l->second->notifyConnectionForced(text);
+}
+
+std::string LinkRegistry::getAuthMechanism(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ return l->second->getAuthMechanism();
+ return string("ANONYMOUS");
+}
+
+std::string LinkRegistry::getAuthCredentials(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l == links.end())
+ return string();
+
+ string result;
+ result += '\0';
+ result += l->second->getUsername();
+ result += '\0';
+ result += l->second->getPassword();
+
+ return result;
+}
+
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 3c47954141..f902490ed3 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -34,6 +34,7 @@ namespace qpid {
namespace broker {
class Broker;
+ class Connection;
class LinkRegistry {
// Declare a timer task to manage the establishment of link connections and the
@@ -106,6 +107,12 @@ namespace broker {
* Return the message store used.
*/
MessageStore* getStore() const;
+
+ void notifyConnection (const std::string& key, Connection* c);
+ void notifyClosed (const std::string& key);
+ void notifyConnectionForced (const std::string& key, const std::string& text);
+ std::string getAuthMechanism (const std::string& key);
+ std::string getAuthCredentials (const std::string& key);
};
}
}