diff options
| author | Ted Ross <tross@apache.org> | 2008-05-21 21:40:49 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-05-21 21:40:49 +0000 |
| commit | 35d9dc572a918015c038245725b0f9894b13132a (patch) | |
| tree | d9efecaeab11e12f0b2f2d87ff7f202383eaa6a0 /cpp/src/qpid/broker | |
| parent | 28404c0026b5bed8ad4ad37d52cd4d3aab5c70bc (diff) | |
| download | qpid-python-35d9dc572a918015c038245725b0f9894b13132a.tar.gz | |
QPID-1087
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658886 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 144 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 36 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionFactory.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionFactory.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.h | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 7 |
11 files changed, 218 insertions, 137 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 4f7686aac4..2992ea45cf 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -361,18 +361,20 @@ void Broker::accept() { // TODO: How to chose the protocolFactory to use for the connection void Broker::connect( const std::string& host, uint16_t port, bool /*useSsl*/, - sys::ConnectionCodec::Factory* f, - sys::ProtocolAccess* access) + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* f) { - getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access); + getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed); } void Broker::connect( - const Url& url, sys::ConnectionCodec::Factory* f) + const Url& url, + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* f) { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0); + connect(addr.host, addr.port, false, failed, f); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 7092a86181..531817db83 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -44,7 +44,6 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" -#include "qpid/sys/ProtocolAccess.h" #include <vector> @@ -135,10 +134,12 @@ class Broker : public sys::Runnable, public Plugin::Target, /** Create a connection to another broker. */ void connect(const std::string& host, uint16_t port, bool useSsl, - sys::ConnectionCodec::Factory* =0, - sys::ProtocolAccess* =0); + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* =0); /** Create a connection to another broker. */ - void connect(const Url& url, sys::ConnectionCodec::Factory* =0); + void connect(const Url& url, + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* =0); // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed // For the present just return the first ProtocolFactory registered. diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 463193a346..ea3d3547f5 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -47,44 +47,26 @@ using qpid::management::Args; namespace qpid { namespace broker { -class Connection::MgmtClient : public Connection::MgmtWrapper -{ - management::Client::shared_ptr mgmtClient; - -public: - MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, - const std::string& mgmtId, bool incoming); - ~MgmtClient(); - void received(framing::AMQFrame& frame); - management::ManagementObject::shared_ptr getManagementObject() const; - void closing(); -}; - -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) : +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), - adapter(*this, isLink), + adapter(*this, isLink_), + isLink(isLink_), mgmtClosing(false), - mgmtId(mgmtId_) -{ - initMgmt(); -} - -void Connection::initMgmt(bool asLink) + mgmtId(mgmtId_), + links(broker_.getLinks()) { Manageable* parent = broker.GetVhostObject (); + if (isLink) + links.notifyConnection (mgmtId, this); + if (parent != 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); if (agent.get () != 0) - { - if (asLink) { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false)); - } else { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true)); - } - } + mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink)); + agent->addObject (mgmtObject); } } @@ -95,19 +77,65 @@ void Connection::requestIOProcessing (boost::function0<void> callback) } -Connection::~Connection () {} +Connection::~Connection () +{ + if (mgmtObject.get() != 0) + mgmtObject->resourceDestroy(); + if (isLink) + links.notifyClosed (mgmtId); +} void Connection::received(framing::AMQFrame& frame){ - if (mgmtClosing) - close (403, "Closed by Management Request", 0, 0); - if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { getChannel(frame.getChannel()).in(frame); } - - if (mgmtWrapper.get()) mgmtWrapper->received(frame); + + if (isLink) + recordFromServer(frame); + else + recordFromClient(frame); +} + +void Connection::recordFromServer (framing::AMQFrame& frame) +{ + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesToClient (); + mgmtObject->inc_bytesToClient (frame.size ()); + } +} + +void Connection::recordFromClient (framing::AMQFrame& frame) +{ + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesFromClient (); + mgmtObject->inc_bytesFromClient (frame.size ()); + } +} + +string Connection::getAuthMechanism() +{ + if (!isLink) + return string("ANONYMOUS"); + + return links.getAuthMechanism(mgmtId); +} + +string Connection::getAuthCredentials() +{ + if (!isLink) + return string(); + + return links.getAuthCredentials(mgmtId); +} + +void Connection::notifyConnectionForced(const string& text) +{ + if (isLink) + links.notifyConnectionForced(mgmtId, text); } void Connection::close( @@ -125,7 +153,7 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) - ptr_map_ptr(channels.begin())->handleDetach(); + ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -147,10 +175,12 @@ bool Connection::doOutput() if (ioCallback) ioCallback(); // Lend the IO thread for management processing ioCallback = 0; - if (mgmtClosing) close (403, "Closed by Management Request", 0, 0); - //then do other output as needed: - return outputTasks.doOutput(); + if (mgmtClosing) + close (403, "Closed by Management Request", 0, 0); + else + //then do other output as needed: + return outputTasks.doOutput(); }catch(ConnectionException& e){ close(e.code, e.what(), 0, 0); }catch(std::exception& e){ @@ -174,7 +204,7 @@ SessionHandler& Connection::getChannel(ChannelId id) { ManagementObject::shared_ptr Connection::GetManagementObject (void) const { - return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); + return dynamic_pointer_cast<ManagementObject>(mgmtObject); } Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) @@ -187,7 +217,7 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) { case management::Client::METHOD_CLOSE : mgmtClosing = true; - if (mgmtWrapper.get()) mgmtWrapper->closing(); + if (mgmtObject.get()) mgmtObject->set_closing(1); out->activateOutput(); status = Manageable::STATUS_OK; break; @@ -196,39 +226,5 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) return status; } -Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, - ManagementAgent::shared_ptr agent, - const std::string& mgmtId, bool incoming) -{ - mgmtClient = management::Client::shared_ptr - (new management::Client (conn, parent, mgmtId, incoming)); - agent->addObject (mgmtClient); -} - -Connection::MgmtClient::~MgmtClient() -{ - if (mgmtClient.get () != 0) - mgmtClient->resourceDestroy (); -} - -void Connection::MgmtClient::received(framing::AMQFrame& frame) -{ - if (mgmtClient.get () != 0) - { - mgmtClient->inc_framesFromClient (); - mgmtClient->inc_bytesFromClient (frame.size ()); - } -} - -management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const -{ - return dynamic_pointer_cast<ManagementObject>(mgmtClient); -} - -void Connection::MgmtClient::closing() -{ - if (mgmtClient) mgmtClient->set_closing (1); -} - }} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index dff1e0653b..e6e3d4d15e 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -43,13 +43,14 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Client.h" -#include "qpid/management/Link.h" #include <boost/ptr_container/ptr_map.hpp> namespace qpid { namespace broker { +class LinkRegistry; + class Connection : public sys::ConnectionInputHandler, public ConnectionState { @@ -62,7 +63,10 @@ class Connection : public sys::ConnectionInputHandler, SessionHandler& getChannel(framing::ChannelId channel); /** Close the connection */ - void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + void close(framing::ReplyCode code = 403, + const string& text = string(), + framing::ClassId classId = 0, + framing::MethodId methodId = 0); // ConnectionInputHandler methods void received(framing::AMQFrame& frame); @@ -78,38 +82,26 @@ class Connection : public sys::ConnectionInputHandler, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - void initMgmt(bool asLink = false); void requestIOProcessing (boost::function0<void>); + void recordFromServer (framing::AMQFrame& frame); + void recordFromClient (framing::AMQFrame& frame); + std::string getAuthMechanism(); + std::string getAuthCredentials(); + void notifyConnectionForced(const std::string& text); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - /** - * Connection may appear, for the purposes of management, as a - * normal client initiated connection or as an agent initiated - * inter-broker link. This wrapper abstracts the common interface - * for both. - */ - class MgmtWrapper - { - public: - virtual ~MgmtWrapper(){} - virtual void received(framing::AMQFrame& frame) = 0; - virtual management::ManagementObject::shared_ptr getManagementObject() const = 0; - virtual void closing() = 0; - virtual void processPending(){} - virtual void process(Connection&, const management::Args&){} - }; - class MgmtClient; - ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; - std::auto_ptr<MgmtWrapper> mgmtWrapper; + bool isLink; bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; + management::Client::shared_ptr mgmtObject; + LinkRegistry& links; }; }} diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index cd015ce4f5..5de5a0230a 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std: } sys::ConnectionCodec* -ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) { +ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { // used to create connections from one broker to another - return new amqp_0_10::Connection(out, broker, id, true, a); + return new amqp_0_10::Connection(out, broker, id, true); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h index bf55ab3b88..5797495054 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.h +++ b/cpp/src/qpid/broker/ConnectionFactory.h @@ -24,7 +24,6 @@ #include "qpid/sys/ConnectionCodec.h" namespace qpid { -namespace sys { class ProtocolAccess; } namespace broker { class Broker; @@ -38,7 +37,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory { create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* - create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0); + create(sys::OutputControl&, const std::string& id); private: Broker& broker; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 162664fb88..77a4d1a3de 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -26,6 +26,7 @@ #include "Connection.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/framing/constants.h" #include "qpid/log/Statement.h" using namespace qpid; @@ -123,6 +124,10 @@ void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyTe if (replyCode != 200) { QPID_LOG(warning, "Client closed connection with " << replyCode << ": " << replyText); } + + if (replyCode == framing::connection::CONNECTION_FORCED) + connection.notifyConnectionForced(replyText); + client.closeOk(); connection.getOutput().close(); } @@ -136,9 +141,10 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/, const framing::Array& /*mechanisms*/, const framing::Array& /*locales*/) { - string response; - server.startOk(FieldTable(), ANONYMOUS, response, en_US); - connection.initMgmt(true); + string mechanism = connection.getAuthMechanism(); + string response = connection.getAuthCredentials(); + + server.startOk(FieldTable(), mechanism, response, en_US); } void ConnectionHandler::Handler::secure(const string& /*challenge*/) diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index cd032495e2..6bcfcf77a3 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -51,13 +51,11 @@ Link::Link(LinkRegistry* _links, : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), broker(_broker), state(0), - access(boost::bind(&Link::established, this), - boost::bind(&Link::closed, this, _1, _2), - boost::bind(&Link::setConnection, this, _1)), visitCount(0), currentInterval(1), closing(false), - channelCounter(1) + channelCounter(1), + connection(0) { if (parent != 0) { @@ -75,8 +73,9 @@ Link::Link(LinkRegistry* _links, Link::~Link () { - if (state == STATE_OPERATIONAL) - access.close(); + if (state == STATE_OPERATIONAL && connection != 0) + connection->close(); + if (mgmtObject.get () != 0) mgmtObject->resourceDestroy (); } @@ -95,13 +94,16 @@ void Link::setStateLH (int newState) case STATE_WAITING : mgmtObject->set_state("Waiting"); break; case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break; case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; + case STATE_FAILED : mgmtObject->set_state("Failed"); break; + case STATE_CLOSED : mgmtObject->set_state("Closed"); break; } } void Link::startConnectionLH () { try { - broker->connect (host, port, useSsl, 0, &access); + broker->connect (host, port, useSsl, + boost::bind (&Link::closed, this, _1, _2)); setStateLH(STATE_CONNECTING); } catch(std::exception& e) { setStateLH(STATE_WAITING); @@ -125,16 +127,21 @@ void Link::closed (int, std::string text) { Mutex::ScopedLock mutex(lock); + connection = 0; + if (state == STATE_OPERATIONAL) QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); - connection.reset(); for (Bridges::iterator i = active.begin(); i != active.end(); i++) created.push_back(*i); active.clear(); - setStateLH(STATE_WAITING); - mgmtObject->set_lastError (text); + if (state != STATE_FAILED) + { + setStateLH(STATE_WAITING); + mgmtObject->set_lastError (text); + } + if (closing) destroy(); } @@ -145,7 +152,10 @@ void Link::destroy () Bridges toDelete; QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); - connection.reset(); + if (connection) + connection->close(403, "closed by management"); + + setStateLH(STATE_CLOSED); // Move the bridges to be deleted into a local vector so there is no // corruption of the iterator caused by bridge deletion. @@ -168,10 +178,7 @@ void Link::destroy () void Link::add(Bridge::shared_ptr bridge) { Mutex::ScopedLock mutex(lock); - created.push_back (bridge); - if (state == STATE_OPERATIONAL && connection.get() != 0) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::cancel(Bridge::shared_ptr bridge) @@ -197,6 +204,9 @@ void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); + if (state != STATE_OPERATIONAL) + return; + //process any pending creates if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { @@ -207,12 +217,10 @@ void Link::ioThreadProcessing() } } -void Link::setConnection(Connection::shared_ptr c) +void Link::setConnection(Connection* c) { Mutex::ScopedLock mutex(lock); - connection = c; - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::maintenanceVisit () @@ -231,6 +239,8 @@ void Link::maintenanceVisit () startConnectionLH(); } } + else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } uint Link::nextChannel() @@ -240,6 +250,14 @@ uint Link::nextChannel() return channelCounter++; } +void Link::notifyConnectionForced(const string text) +{ + Mutex::ScopedLock mutex(lock); + + setStateLH(STATE_FAILED); + mgmtObject->set_lastError(text); +} + void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index c4eca86c19..de757d112e 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -27,7 +27,6 @@ #include "PersistableConfig.h" #include "Bridge.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/ProtocolAccess.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" #include "qpid/management/Link.h" @@ -57,7 +56,6 @@ namespace qpid { management::Link::shared_ptr mgmtObject; Broker* broker; int state; - sys::ProtocolAccess access; uint32_t visitCount; uint32_t currentInterval; bool closing; @@ -66,21 +64,20 @@ namespace qpid { Bridges created; // Bridges pending creation Bridges active; // Bridges active uint channelCounter; - boost::shared_ptr<Connection> connection; + Connection* connection; static const int STATE_WAITING = 1; static const int STATE_CONNECTING = 2; static const int STATE_OPERATIONAL = 3; + static const int STATE_FAILED = 4; + static const int STATE_CLOSED = 5; - static const uint32_t MAX_INTERVAL = 16; + static const uint32_t MAX_INTERVAL = 32; void setStateLH (int newState); void startConnectionLH(); // Start the IO Connection - void established(); // Called when connection is created - void closed(int, std::string); // Called when connection goes away void destroy(); // Called when mgmt deletes this link void ioThreadProcessing(); // Called on connection's IO thread by request - void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection public: typedef boost::shared_ptr<Link> shared_ptr; @@ -106,6 +103,16 @@ namespace qpid { void add(Bridge::shared_ptr); void cancel(Bridge::shared_ptr); + void established(); // Called when connection is created + void closed(int, std::string); // Called when connection goes away + void setConnection(Connection*); // Set pointer to the AMQP Connection + + string getAuthMechanism() { return authMechanism; } + string getUsername() { return username; } + string getPassword() { return password; } + + void notifyConnectionForced(const std::string text); + // PersistableConfig: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index be3c67077e..455cc8452e 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -27,7 +27,7 @@ using std::pair; using std::stringstream; using boost::intrusive_ptr; -#define LINK_MAINT_INTERVAL 5 +#define LINK_MAINT_INTERVAL 2 LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0) { @@ -185,3 +185,56 @@ MessageStore* LinkRegistry::getStore() const { return store; } +void LinkRegistry::notifyConnection(const std::string& key, Connection* c) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + { + l->second->established(); + l->second->setConnection(c); + } +} + +void LinkRegistry::notifyClosed(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + l->second->closed(0, "Closed by peer"); +} + +void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + l->second->notifyConnectionForced(text); +} + +std::string LinkRegistry::getAuthMechanism(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + return l->second->getAuthMechanism(); + return string("ANONYMOUS"); +} + +std::string LinkRegistry::getAuthCredentials(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l == links.end()) + return string(); + + string result; + result += '\0'; + result += l->second->getUsername(); + result += '\0'; + result += l->second->getPassword(); + + return result; +} + + diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 3c47954141..f902490ed3 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -34,6 +34,7 @@ namespace qpid { namespace broker { class Broker; + class Connection; class LinkRegistry { // Declare a timer task to manage the establishment of link connections and the @@ -106,6 +107,12 @@ namespace broker { * Return the message store used. */ MessageStore* getStore() const; + + void notifyConnection (const std::string& key, Connection* c); + void notifyClosed (const std::string& key); + void notifyConnectionForced (const std::string& key, const std::string& text); + std::string getAuthMechanism (const std::string& key); + std::string getAuthCredentials (const std::string& key); }; } } |
