diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/Connection.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 78 |
1 files changed, 12 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index e68c906cc2..df1a23f882 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -25,9 +25,9 @@ #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/AclModule.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/sys/SecuritySettings.h" -#include "qpid/sys/ClusterSafe.h" +#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -86,20 +86,14 @@ Connection::Connection(ConnectionOutputHandler* out_, std::string& mgmtId_, const qpid::sys::SecuritySettings& external, bool link_, - uint64_t objectId_, - bool shadow_, - bool delayManagement, - bool authenticated_ + uint64_t objectId_ ) : ConnectionState(out_, broker_), securitySettings(external), - shadow(shadow_), - authenticated(authenticated_), adapter(*this, link_), link(link_), mgmtClosing(false), mgmtId(mgmtId_), - mgmtObject(0), links(broker_.getLinks()), agent(0), timer(broker_.getTimer()), @@ -109,11 +103,6 @@ Connection::Connection(ConnectionOutputHandler* out_, { outboundTracker.wrap(out); broker.getConnectionObservers().connection(*this); - // In a cluster, allow adding the management object to be delayed. - if (!delayManagement) addManagementObject(); -} - -void Connection::addManagementObject() { assert(agent == 0); assert(mgmtObject == 0); Manageable* parent = broker.GetVhostObject(); @@ -121,8 +110,7 @@ void Connection::addManagementObject() { agent = broker.getManagementAgent(); if (agent != 0) { // TODO set last bool true if system connection - mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !link, false); - mgmtObject->set_shadow(shadow); + mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10")); agent->addObject(mgmtObject, objectId); } ConnectionState::setUrl(mgmtId); @@ -139,13 +127,11 @@ void Connection::requestIOProcessing(boost::function0<void> callback) Connection::~Connection() { if (mgmtObject != 0) { - mgmtObject->resourceDestroy(); - // In a cluster, Connections destroyed during shutdown are in - // a cluster-unsafe context. Don't raise an event in that case. - if (!link && isClusterSafe()) - agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId())); + if (!link) + agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties())); QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId() << " rhost:" << mgmtId ); + mgmtObject->resourceDestroy(); } broker.getConnectionObservers().closed(*this); @@ -188,8 +174,7 @@ bool isMessage(const AMQMethodBody* method) void Connection::recordFromServer(const framing::AMQFrame& frame) { - // Don't record management stats in cluster-unsafe contexts - if (mgmtObject != 0 && isClusterSafe()) + if (mgmtObject != 0) { qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); cStats->framesToClient += 1; @@ -203,8 +188,7 @@ void Connection::recordFromServer(const framing::AMQFrame& frame) void Connection::recordFromClient(const framing::AMQFrame& frame) { - // Don't record management stats in cluster-unsafe contexts - if (mgmtObject != 0 && isClusterSafe()) + if (mgmtObject != 0) { qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); cStats->framesFromClient += 1; @@ -279,28 +263,7 @@ void Connection::notifyConnectionForced(const string& text) void Connection::setUserId(const string& userId) { - // Account for changing userId - AclModule* acl = broker.getAcl(); - if (acl) - { - acl->setUserId(*this, userId); - } - ConnectionState::setUserId(userId); - // In a cluster, the cluster code will raise the connect event - // when the connection is replicated to the cluster. - if (!broker.isInCluster()) raiseConnectEvent(); -} - -void Connection::raiseConnectEvent() { - if (mgmtObject != 0) { - mgmtObject->set_authIdentity(userId); - agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId)); - } - - QPID_LOG_CAT(debug, model, "Create connection. user:" << userId - << " rhost:" << mgmtId ); - } void Connection::setUserProxyAuth(bool b) @@ -327,19 +290,6 @@ void Connection::close(connection::CloseCode code, const string& text) getOutput().close(); } -// Send a close to the client but keep the channels. Used by cluster. -void Connection::sendClose() { - if (heartbeatTimer) - heartbeatTimer->cancel(); - if (timeoutTimer) - timeoutTimer->cancel(); - if (linkHeartbeatTimer) { - linkHeartbeatTimer->cancel(); - } - adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); - getOutput().close(); -} - void Connection::idleOut(){} void Connection::idleIn(){} @@ -364,9 +314,6 @@ void Connection::closed(){ // Physically closed, suspend open sessions. void Connection::doIoCallbacks() { if (!isOpen()) return; // Don't process IO callbacks until we are open. ScopedLock<Mutex> l(ioCallbackLock); - // Although IO callbacks execute in the connection thread context, they are - // not cluster safe because they are queued for execution in non-IO threads. - ClusterUnsafeScope cus; while (!ioCallbacks.empty()) { boost::function0<void> cb = ioCallbacks.front(); ioCallbacks.pop(); @@ -413,9 +360,9 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject* Connection::GetManagementObject(void) const +ManagementObject::shared_ptr Connection::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&) @@ -499,7 +446,7 @@ void Connection::abort() void Connection::setHeartbeatInterval(uint16_t heartbeat) { setHeartbeat(heartbeat); - if (heartbeat > 0 && !isShadow()) { + if (heartbeat > 0) { if (!heartbeatTimer) { heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); timer.add(heartbeatTimer); @@ -535,7 +482,6 @@ void Connection::OutboundFrameTracker::close() { next->close(); } size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); } void Connection::OutboundFrameTracker::abort() { next->abort(); } void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } -void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) { next->send(f); |