diff options
author | Alan Conway <aconway@apache.org> | 2010-06-16 20:32:04 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-06-16 20:32:04 +0000 |
commit | a6801aa6ac2c6d97b6747ef7bd7d2264be9c58ab (patch) | |
tree | d4368c5338c635d2cd7a7c40c576636b18098ed6 /cpp/src | |
parent | 2273c62236d666ab677d964591f564f81908d6ad (diff) | |
download | qpid-python-a6801aa6ac2c6d97b6747ef7bd7d2264be9c58ab.tar.gz |
Bug 603835 - cluster_tests.test_management failing.
Clean up connections causing extra connection objects in the mangement agent map.
- update connection was not being closed.
- connections belonging to members that left the cluster were not fully cleaned up
Also fixed test errors making failover_soak fail sporadically.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@955370 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 24 | ||||
-rw-r--r-- | cpp/src/tests/failover_soak.cpp | 9 |
6 files changed, 53 insertions, 37 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9ca6fbf2bf..5d13c1ad8f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -361,7 +361,6 @@ void Cluster::erase(const ConnectionId& id) { // Called by Connection::deliverClose() in deliverFrameQueue thread. void Cluster::erase(const ConnectionId& id, Lock&) { - QPID_LOG(info, *this << " connection closed " << id); connections.erase(id); decoder.erase(id); } @@ -1024,7 +1023,7 @@ void Cluster::memberUpdate(Lock& l) { ConnectionMap::iterator j = i++; MemberId m = j->second->getId().getMember(); if (m != self && !map.isMember(m)) { - j->second->getBrokerConnection().closed(); + j->second->close(); erase(j->second->getId(), l); } } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index c402415fab..22e1db2036 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -101,19 +101,18 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (isLocalClient()) { // Local clients are announced to the cluster // and initialized when the announce is received. - QPID_LOG(info, "new client connection " << *this); giveReadCredit(cluster.getSettings().readMax); // Flow control init(); } else { // Catch-up shadow connections initialized using nextShadow id. assert(catchUp); - QPID_LOG(info, "new catch-up connection " << *this); - connectionCtor.mgmtId = updateIn.nextShadowMgmtId; + if (!updateIn.nextShadowMgmtId.empty()) + connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); init(); } - + QPID_LOG(info, "incoming connection " << *this); } void Connection::setSecureConnection(broker::SecureConnection* sc) { @@ -123,8 +122,6 @@ void Connection::setSecureConnection(broker::SecureConnection* sc) { void Connection::init() { connection = connectionCtor.construct(); - QPID_LOG(debug, cluster << " initialized connection: " << *this - << " ssf=" << connection->getExternalSecuritySettings().ssf); if (isLocalClient()) { if (secureConnection) connection->setSecureConnection(secureConnection); // Actively send cluster-order frames from local node @@ -171,7 +168,6 @@ void Connection::announce( Connection::~Connection() { if (connection.get()) connection->setErrorListener(0); - QPID_LOG(debug, cluster << " deleted connection: " << *this); } bool Connection::doOutput() { @@ -250,16 +246,15 @@ void Connection::deliveredFrame(const EventFrame& f) { // A local connection is closed by the network layer. void Connection::closed() { try { - if (catchUp) { + if (isUpdated()) { + QPID_LOG(debug, cluster << " update connection closed " << *this); + close(); + } + else if (catchUp) { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); cluster.leave(); } - else if (isUpdated()) { - QPID_LOG(debug, cluster << " closed update connection " << *this); - if (connection.get()) connection->closed(); - } else if (isLocal()) { - QPID_LOG(debug, cluster << " local close of replicated connection " << *this); // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. @@ -275,15 +270,20 @@ void Connection::closed() { // Self-delivery of close message, close the connection. void Connection::deliverClose () { assert(!catchUp); + close(); + cluster.erase(self); +} + +// Close the connection +void Connection::close() { if (connection.get()) { connection->closed(); // Ensure we delete the broker::Connection in the deliver thread. connection.reset(); } - cluster.erase(self); } -// The connection has been killed for misbehaving +// The connection has been killed for misbehaving, called in connection thread. void Connection::abort() { if (connection.get()) { connection->abort(); @@ -424,7 +424,7 @@ void Connection::shadowReady( uint64_t memberId, uint64_t connectionId, const string& mgmtId, const string& username, const string& fragment, uint32_t sendMax) { - QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId()); + QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId()); ConnectionId shadowId = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); @@ -442,13 +442,19 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); updateIn.consumerNumbering.clear(); - self.second = 0; // Mark this as completed update connection. + closeUpdated(); } void Connection::retractOffer() { QPID_LOG(info, cluster << " incoming update retracted on connection " << *this); cluster.updateInRetracted(); - self.second = 0; // Mark this as completed update connection. + closeUpdated(); +} + +void Connection::closeUpdated() { + self.second = 0; // Mark this as completed update connection. + if (connection.get()) + connection->close(connection::CLOSE_CODE_NORMAL, "OK"); } bool Connection::isLocal() const { @@ -527,7 +533,10 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { if (c.isLocal()) type = "local"; else if (c.isShadow()) type = "shadow"; else if (c.isUpdated()) type = "updated"; - return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; + const broker::Connection* bc = c.getBrokerConnection(); + if (bc) o << bc->getMgmtId(); + else o << "<disconnected>"; + return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")"; } void Connection::txStart() { diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 70c4d0e2a3..45d832a5ff 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -75,7 +75,8 @@ class Connection : ~Connection(); ConnectionId getId() const { return self; } - broker::Connection& getBrokerConnection() { return *connection; } + broker::Connection* getBrokerConnection() { return connection.get(); } + const broker::Connection* getBrokerConnection() const { return connection.get(); } /** Local connections may be clients or catch-up connections */ bool isLocal() const; @@ -167,6 +168,7 @@ class Connection : void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict, const std::string& username, const std::string& initFrames); + void close(); void abort(); void deliverClose(); @@ -227,6 +229,7 @@ class Connection : broker::SessionState& sessionState(); broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); + void closeUpdated(); Cluster& cluster; ConnectionId self; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index f80eb9c434..1354dab17b 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -83,7 +83,7 @@ void OutputInterceptor::deliverDoOutput(uint32_t limit) { newLimit = (sendMax + sent) / 2; } sent = 0; - while (sent < limit && parent.getBrokerConnection().doOutput()) + while (sent < limit && parent.getBrokerConnection()->doOutput()) ++sent; if (sent == limit) sendDoOutput(newLimit); } diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 6499519187..cb296ab8da 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -150,7 +150,8 @@ void UpdateClient::update() { // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + std::for_each(connections.begin(), connections.end(), + boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); session.close(); @@ -167,15 +168,18 @@ void UpdateClient::update() { client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false); client::ConnectionAccess::getImpl(connection)->handle(frame); - connection.close(); - QPID_LOG(debug, updaterId << " update completed to " << updateeId - << " at " << updateeUrl << ": " << membership); + // FIXME aconway 2010-06-16: Connection will be closed from the other end. + // connection.close(); + // FIXME aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the // Connection object. Remove when the bug is fixed. // - sys::usleep(10*1000); // 100ms + sys::usleep(10*1000); + + QPID_LOG(debug, updaterId << " update completed to " << updateeId + << " at " << updateeUrl << ": " << membership); } namespace { @@ -347,9 +351,11 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); - + assert(updateConnection->getBrokerConnection()); + broker::Connection& bc = *updateConnection->getBrokerConnection(); + // Send the management ID first on the main connection. - std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId(); + std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId(); ClusterConnectionProxy(session).shadowPrepare(mgmtId); // Make sure its received before opening shadow connection session.sync(); @@ -357,7 +363,6 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda // Open shadow connection and update it. shadowConnection = catchUpConnection(); - broker::Connection& bc = updateConnection->getBrokerConnection(); connectionSettings.maxFrameSize = bc.getFrameMax(); shadowConnection.open(updateeUrl, connectionSettings); bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); @@ -381,8 +386,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. - QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() - << "[" << sh.getChannel() << "] = " << ss->getId()); + QPID_LOG(debug, updaterId << " updating session " << ss->getId()); // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp index f61a565f50..c2ac36a757 100644 --- a/cpp/src/tests/failover_soak.cpp +++ b/cpp/src/tests/failover_soak.cpp @@ -673,7 +673,7 @@ main ( int argc, char const ** argv ) // Get prefix for each queue name. stringstream queue_prefix; queue_prefix << "failover_soak_" << getpid(); - + string queue_prefix_str(queue_prefix.str()); // Run the declareQueues child. int childStatus; @@ -683,7 +683,7 @@ main ( int argc, char const ** argv ) declareQueuesPath, verbosity, durable, - queue_prefix.str().c_str(), + queue_prefix_str.c_str(), n_queues ); if ( -1 == dqClientPid ) { @@ -707,6 +707,7 @@ main ( int argc, char const ** argv ) stringstream queue_name; queue_name << queue_prefix.str() << '_' << i; + string queue_name_str(queue_name.str()); // Receiving client --------------------------- pid_t receivingClientPid = @@ -715,7 +716,7 @@ main ( int argc, char const ** argv ) receiverPath, reportFrequency, verbosity, - queue_name.str().c_str() ); + queue_name_str.c_str() ); if ( -1 == receivingClientPid ) { cerr << "END_OF_TEST ERROR_START_RECEIVER\n"; return CANT_FORK_RECEIVER; @@ -731,7 +732,7 @@ main ( int argc, char const ** argv ) reportFrequency, verbosity, durable, - queue_name.str().c_str() ); + queue_name_str.c_str() ); if ( -1 == sendingClientPid ) { cerr << "END_OF_TEST ERROR_START_SENDER\n"; return CANT_FORK_SENDER; |