summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-06-22 13:29:52 +0000
committerAlan Conway <aconway@apache.org>2010-06-22 13:29:52 +0000
commita49decc7d56bdb704a5d1580058c0da57e9a9353 (patch)
treeaf0acf1f9e7e5f48336407ae438e11528db75b38 /cpp/src/qpid/cluster
parent265841a55cca55a7d3f8eea1d9e9c24a5fc2e350 (diff)
downloadqpid-python-a49decc7d56bdb704a5d1580058c0da57e9a9353.tar.gz
Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state outside cluster context" and "confirmed < (50+0) but only sent < (49+0)" Fix was to: - delay completion of incoming update till update connection closes. - delay addding new connections to managment until connection is announced. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@956882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp13
-rw-r--r--cpp/src/qpid/cluster/Cluster.h3
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp45
-rw-r--r--cpp/src/qpid/cluster/Connection.h14
4 files changed, 49 insertions, 26 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5d13c1ad8f..7eb0798914 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -194,7 +194,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 904565;
+const uint32_t Cluster::CLUSTER_VERSION = 956001;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -269,6 +269,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
lastAliveCount(0),
lastBroker(false),
updateRetracted(false),
+ updateClosed(false),
error(*this)
{
// We give ownership of the timer to the broker and keep a plain pointer.
@@ -863,6 +864,14 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
connectionSettings(settings)));
}
+// Called in network thread
+void Cluster::updateInClosed() {
+ Lock l(lock);
+ assert(!updateClosed);
+ updateClosed = true;
+ checkUpdateIn(l);
+}
+
// Called in update thread.
void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
@@ -879,6 +888,7 @@ void Cluster::updateInRetracted() {
void Cluster::checkUpdateIn(Lock& l) {
if (state != UPDATEE) return; // Wait till we reach the stall point.
+ if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
failoverExchange->setUrls(getUrls(l));
@@ -895,6 +905,7 @@ void Cluster::checkUpdateIn(Lock& l) {
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
+ updateClosed = false;
state = JOINER;
QPID_LOG(notice, *this << " update retracted, sending new update request.");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 0d8b55cf01..84dee27e94 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -97,6 +97,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave();
// Update completed - called in update thread
+ void updateInClosed();
void updateInDone(const ClusterMap&);
void updateInRetracted();
@@ -277,7 +278,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
- bool updateRetracted;
+ bool updateRetracted, updateClosed;
ErrorCheck error;
UpdateReceiver updateReceiver;
ClusterTimer* timer;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 22e1db2036..42f800bd18 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -22,7 +22,6 @@
#include "UpdateClient.h"
#include "Cluster.h"
#include "UpdateReceiver.h"
-
#include "qpid/assert.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
@@ -43,7 +42,6 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-
#include <boost/current_function.hpp>
@@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
- // Local clients are announced to the cluster
- // and initialized when the announce is received.
giveReadCredit(cluster.getSettings().readMax); // Flow control
- init();
+ // Delay adding the connection to the management map until announce()
+ connectionCtor.delayManagement = true;
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- init();
- }
- QPID_LOG(info, "incoming connection " << *this);
+ }
+ init();
+ QPID_LOG(debug, cluster << " local connection " << *this);
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -152,8 +149,11 @@ void Connection::announce(
QPID_ASSERT(ssf == connectionCtor.external.ssf);
QPID_ASSERT(authid == connectionCtor.external.authid);
QPID_ASSERT(nodict == connectionCtor.external.nodict);
- // Local connections are already initialized.
- if (isShadow()) {
+ // Local connections are already initialized but with management delayed.
+ if (isLocalClient()) {
+ connection->addManagementObject();
+ }
+ else if (isShadow()) {
init();
// Play initial frames into the connection.
Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
@@ -162,8 +162,9 @@ void Connection::announce(
connection->received(frame);
connection->setUserId(username);
}
- // Raise the connection management event now that the connection is replicated.
+ // Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
+ QPID_LOG(debug, cluster << " replicated connection " << *this);
}
Connection::~Connection() {
@@ -249,6 +250,7 @@ void Connection::closed() {
if (isUpdated()) {
QPID_LOG(debug, cluster << " update connection closed " << *this);
close();
+ cluster.updateInClosed();
}
else if (catchUp) {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
@@ -259,7 +261,8 @@ void Connection::closed() {
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.closeOutput();
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self);
}
}
catch (const std::exception& e) {
@@ -268,17 +271,21 @@ void Connection::closed() {
}
// Self-delivery of close message, close the connection.
-void Connection::deliverClose () {
- assert(!catchUp);
- close();
+void Connection::deliverClose (bool aborted) {
+ QPID_LOG(debug, cluster << " replicated close of " << *this);
+ if (connection.get()) {
+ if (aborted) connection->abort();
+ else connection->closed();
+ connection.reset();
+ }
cluster.erase(self);
}
// Close the connection
void Connection::close() {
+ QPID_LOG(debug, cluster << " local close of " << *this);
if (connection.get()) {
connection->closed();
- // Ensure we delete the broker::Connection in the deliver thread.
connection.reset();
}
}
@@ -286,11 +293,9 @@ void Connection::close() {
// The connection has been killed for misbehaving, called in connection thread.
void Connection::abort() {
if (connection.get()) {
- connection->abort();
- // Ensure we delete the broker::Connection in the deliver thread.
- connection.reset();
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self);
}
- cluster.erase(self);
}
// ConnectionCodec::decode receives read buffers from directly-connected clients.
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 45d832a5ff..72a98c12f1 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -170,7 +170,7 @@ class Connection :
const std::string& initFrames);
void close();
void abort();
- void deliverClose();
+ void deliverClose(bool);
OutputInterceptor& getOutput() { return output; }
@@ -194,6 +194,7 @@ class Connection :
bool isLink;
uint64_t objectId;
bool shadow;
+ bool delayManagement;
ConnectionCtor(
sys::ConnectionOutputHandler* out_,
@@ -202,14 +203,19 @@ class Connection :
const qpid::sys::SecuritySettings& external_,
bool isLink_=false,
uint64_t objectId_=0,
- bool shadow_=false
+ bool shadow_=false,
+ bool delayManagement_=false
) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
- isLink(isLink_), objectId(objectId_), shadow(shadow_)
+ isLink(isLink_), objectId(objectId_), shadow(shadow_),
+ delayManagement(delayManagement_)
{}
std::auto_ptr<broker::Connection> construct() {
return std::auto_ptr<broker::Connection>(
- new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow));
+ new broker::Connection(
+ out, broker, mgmtId, external, isLink, objectId,
+ shadow, delayManagement)
+ );
}
};