diff options
| author | Alan Conway <aconway@apache.org> | 2010-06-22 13:29:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-06-22 13:29:52 +0000 |
| commit | a49decc7d56bdb704a5d1580058c0da57e9a9353 (patch) | |
| tree | af0acf1f9e7e5f48336407ae438e11528db75b38 /cpp/src/qpid/cluster | |
| parent | 265841a55cca55a7d3f8eea1d9e9c24a5fc2e350 (diff) | |
| download | qpid-python-a49decc7d56bdb704a5d1580058c0da57e9a9353.tar.gz | |
Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state
outside cluster context" and "confirmed < (50+0) but only sent < (49+0)"
Fix was to:
- delay completion of incoming update till update connection closes.
- delay addding new connections to managment until connection is announced.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@956882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 45 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 14 |
4 files changed, 49 insertions, 26 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5d13c1ad8f..7eb0798914 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -194,7 +194,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 904565; +const uint32_t Cluster::CLUSTER_VERSION = 956001; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -269,6 +269,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastAliveCount(0), lastBroker(false), updateRetracted(false), + updateClosed(false), error(*this) { // We give ownership of the timer to the broker and keep a plain pointer. @@ -863,6 +864,14 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { connectionSettings(settings))); } +// Called in network thread +void Cluster::updateInClosed() { + Lock l(lock); + assert(!updateClosed); + updateClosed = true; + checkUpdateIn(l); +} + // Called in update thread. void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); @@ -879,6 +888,7 @@ void Cluster::updateInRetracted() { void Cluster::checkUpdateIn(Lock& l) { if (state != UPDATEE) return; // Wait till we reach the stall point. + if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; failoverExchange->setUrls(getUrls(l)); @@ -895,6 +905,7 @@ void Cluster::checkUpdateIn(Lock& l) { } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; + updateClosed = false; state = JOINER; QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 0d8b55cf01..84dee27e94 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -97,6 +97,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread + void updateInClosed(); void updateInDone(const ClusterMap&); void updateInRetracted(); @@ -277,7 +278,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; - bool updateRetracted; + bool updateRetracted, updateClosed; ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 22e1db2036..42f800bd18 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -22,7 +22,6 @@ #include "UpdateClient.h" #include "Cluster.h" #include "UpdateReceiver.h" - #include "qpid/assert.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" @@ -43,7 +42,6 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" - #include <boost/current_function.hpp> @@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, { cluster.addLocalConnection(this); if (isLocalClient()) { - // Local clients are announced to the cluster - // and initialized when the announce is received. giveReadCredit(cluster.getSettings().readMax); // Flow control - init(); + // Delay adding the connection to the management map until announce() + connectionCtor.delayManagement = true; } else { // Catch-up shadow connections initialized using nextShadow id. @@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - init(); - } - QPID_LOG(info, "incoming connection " << *this); + } + init(); + QPID_LOG(debug, cluster << " local connection " << *this); } void Connection::setSecureConnection(broker::SecureConnection* sc) { @@ -152,8 +149,11 @@ void Connection::announce( QPID_ASSERT(ssf == connectionCtor.external.ssf); QPID_ASSERT(authid == connectionCtor.external.authid); QPID_ASSERT(nodict == connectionCtor.external.nodict); - // Local connections are already initialized. - if (isShadow()) { + // Local connections are already initialized but with management delayed. + if (isLocalClient()) { + connection->addManagementObject(); + } + else if (isShadow()) { init(); // Play initial frames into the connection. Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); @@ -162,8 +162,9 @@ void Connection::announce( connection->received(frame); connection->setUserId(username); } - // Raise the connection management event now that the connection is replicated. + // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); + QPID_LOG(debug, cluster << " replicated connection " << *this); } Connection::~Connection() { @@ -249,6 +250,7 @@ void Connection::closed() { if (isUpdated()) { QPID_LOG(debug, cluster << " update connection closed " << *this); close(); + cluster.updateInClosed(); } else if (catchUp) { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); @@ -259,7 +261,8 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.closeOutput(); - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self); } } catch (const std::exception& e) { @@ -268,17 +271,21 @@ void Connection::closed() { } // Self-delivery of close message, close the connection. -void Connection::deliverClose () { - assert(!catchUp); - close(); +void Connection::deliverClose (bool aborted) { + QPID_LOG(debug, cluster << " replicated close of " << *this); + if (connection.get()) { + if (aborted) connection->abort(); + else connection->closed(); + connection.reset(); + } cluster.erase(self); } // Close the connection void Connection::close() { + QPID_LOG(debug, cluster << " local close of " << *this); if (connection.get()) { connection->closed(); - // Ensure we delete the broker::Connection in the deliver thread. connection.reset(); } } @@ -286,11 +293,9 @@ void Connection::close() { // The connection has been killed for misbehaving, called in connection thread. void Connection::abort() { if (connection.get()) { - connection->abort(); - // Ensure we delete the broker::Connection in the deliver thread. - connection.reset(); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self); } - cluster.erase(self); } // ConnectionCodec::decode receives read buffers from directly-connected clients. diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 45d832a5ff..72a98c12f1 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -170,7 +170,7 @@ class Connection : const std::string& initFrames); void close(); void abort(); - void deliverClose(); + void deliverClose(bool); OutputInterceptor& getOutput() { return output; } @@ -194,6 +194,7 @@ class Connection : bool isLink; uint64_t objectId; bool shadow; + bool delayManagement; ConnectionCtor( sys::ConnectionOutputHandler* out_, @@ -202,14 +203,19 @@ class Connection : const qpid::sys::SecuritySettings& external_, bool isLink_=false, uint64_t objectId_=0, - bool shadow_=false + bool shadow_=false, + bool delayManagement_=false ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_), - isLink(isLink_), objectId(objectId_), shadow(shadow_) + isLink(isLink_), objectId(objectId_), shadow(shadow_), + delayManagement(delayManagement_) {} std::auto_ptr<broker::Connection> construct() { return std::auto_ptr<broker::Connection>( - new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow)); + new broker::Connection( + out, broker, mgmtId, external, isLink, objectId, + shadow, delayManagement) + ); } }; |
