summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-03-08 23:52:35 +0000
committerAlan Conway <aconway@apache.org>2009-03-08 23:52:35 +0000
commitbab8070ad7989386b11f4106d9f15e73d9246c1d (patch)
treedfca93bc4ca56de921c01f87e966855816744484 /cpp/src/qpid/cluster/Cluster.cpp
parente14ed937f459d07735a5ed22636127fdf81dc88c (diff)
downloadqpid-python-bab8070ad7989386b11f4106d9f15e73d9246c1d.tar.gz
Fixed race conditions in cluster.
Execute all cluster logic in frameDeliverQueue thread, decoding only in eventDeliverQueue thread. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp178
1 files changed, 100 insertions, 78 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8946a71446..169d0fb1af 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -107,11 +107,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- connections(*this),
- frameId(0),
initialized(false),
+ decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
+ discarding(true),
state(INIT),
- eventId(0),
+ frameId(0),
lastSize(0),
lastBroker(false)
{
@@ -156,14 +156,19 @@ void Cluster::initialize() {
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- connections.insert(c);
+ localConnections.insert(c);
}
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- connections.insert(c);
+ // Safe to use connections here because we're pre-catchup, either
+ // discarding or stalled, so deliveredFrame is not processing any
+ // connection events.
+ assert(discarding);
+ connections.insert(ConnectionMap::value_type(c->getId(), c));
}
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id) {
connections.erase(id);
}
@@ -195,7 +200,6 @@ void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- connections.clear();
try { broker.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -217,58 +221,89 @@ void Cluster::deliver(
Event e(Event::decodeCopy(from, buf));
if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
- deliver(e);
+ deliverEvent(e);
}
-void Cluster::deliver(const Event& e) {
+void Cluster::deliverEvent(const Event& e) {
deliverEventQueue.push(e);
}
+void Cluster::deliverFrame(const EventFrame& e) {
+ deliverFrameQueue.push(e);
+}
+
// Handler for deliverEventQueue.
-// This thread executes cluster controls and decodes connection data events.
-void Cluster::deliveredEvent(const Event& event) {
- Event e(event);
- Mutex::ScopedLock l(lock);
- if (state >= CATCHUP) {
- e.setId(++eventId);
+// This thread decodes frames from events.
+void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DLVR: " << e);
- }
- if (e.isCluster()) { // Cluster control, process in this thread.
+ if (e.isCluster()) {
EventFrame ef(e, e.getFrame());
- QPID_LOG(trace, *this << " DLVR: " << ef);
- ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
- if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
+ // Stop the deliverEventQueue on update offers.
+ // This preserves the connection decoder fragments for an update.
+ ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
+ if (offer)
+ deliverEventQueue.stop();
+ deliverFrame(ef);
}
- else if (state >= CATCHUP) { // Handle connection frames
- if (e.getType() == CONTROL)
- connectionFrame(EventFrame(e, e.getFrame()));
+ else if(!discarding) {
+ if (e.isControl())
+ deliverFrame(EventFrame(e, e.getFrame()));
else
- connections.decode(e, e.getData());
- }
- // Drop connection frames while state < CATCHUP
+ decoder.decode(e, e.getData());
}
-
-void Cluster::connectionFrame(const EventFrame& frame) {
- deliverFrameQueue.push(frame);
+ else // Discard connection events if discarding is set.
+ QPID_LOG(trace, *this << " DROP: " << e);
}
// Handler for deliverFrameQueue.
-// This thread executes connection control and data frames.
-void Cluster::deliveredFrame(const EventFrame& event) {
- // No lock, only use connections, not Cluster state.
- EventFrame e(event);
- if(!e.frame.getBody()) { // marks the stall point, start the update task.
- updateThread=Thread(*updateTask);
+// This thread executes the main logic.
+void Cluster::deliveredFrame(const EventFrame& e) {
+ Mutex::ScopedLock l(lock);
+ if (e.isCluster()) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
+ if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
}
- else {
+ else if (state >= CATCHUP) {
QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.type == DATA) // Add cluster-id to to data frames.
- e.frame.setClusterId(frameId++);
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ EventFrame ef(e); // Non-const copy
+ if (ef.type == DATA) // Add cluster-id to to data frames.
+ ef.frame.setClusterId(frameId++);
+ ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
}
+ else // Drop connection frames while state < CATCHUP
+ QPID_LOG(trace, *this << " DROP: " << e);
+}
+
+// Called in deliverFrameQueue thread
+ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) {
+ ConnectionPtr cp;
+ ConnectionMap::iterator i = connections.find(id);
+ if (i != connections.end())
+ cp = i->second;
+ else {
+ if(id.getMember() == self)
+ cp = localConnections.getErase(id);
+ else {
+ // New remote connection, create a shadow.
+ std::ostringstream mgmtId;
+ mgmtId << id;
+ cp = new Connection(*this, shadowOut, mgmtId.str(), id);
+ }
+ if (cp)
+ connections.insert(ConnectionMap::value_type(id, cp));
+ }
+ return cp;
+}
+
+Cluster::ConnectionVector Cluster::getConnections(Lock&) {
+ ConnectionVector result(connections.size());
+ std::transform(connections.begin(), connections.end(), result.begin(),
+ boost::bind(&ConnectionMap::value_type::second, _1));
+ return result;
}
struct AddrList {
@@ -316,7 +351,7 @@ void Cluster::configChange (
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+ deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
}
void Cluster::setReady(Lock&) {
@@ -337,6 +372,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
setClusterId(true, l);
+ discarding = false;
setReady(l);
map = ClusterMap(self, myUrl, true);
memberUpdate(l);
@@ -396,28 +432,18 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
-void Cluster::stall(Lock&) {
- // Stop processing the deliveredEventQueue in order to send or
- // recieve an update.
- deliverEventQueue.stop();
-}
-
-void Cluster::unstall(Lock&) {
- // Stop processing the deliveredEventQueue in order to send or
- // recieve an update.
- deliverEventQueue.start();
-}
-
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
+ // NOTE: deliverEventQueue has been stopped at the update offer by
+ // deliveredEvent in case an update is required.
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
assert(state == OFFER);
- if (url) { // My offer was first.
+ if (url) // My offer was first.
updateStart(updatee, *url, l);
- }
else { // Another offer was first.
+ deliverEventQueue.start(); // Don't need to update
setReady(l);
QPID_LOG(info, *this << " cancelled update offer to " << updatee);
makeOffer(map.firstJoiner(), l); // Maybe make another offer.
@@ -428,50 +454,48 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
setClusterId(uuid, l);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- stall(l);
checkUpdateIn(l);
}
+ else
+ deliverEventQueue.start(); // Don't need to update
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+ // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
- QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- stall(l);
-
+ QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
- updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
+ updateThread = Thread(
+ new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
- cs);
- // Push an empty frame onto the deliverFrameQueue to mark the stall point.
- // The deliverFrameQueue thread will start the update at that point.
- deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame()));
+ cs));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) {
Lock l(lock);
updatedMap = m;
- eventId = eventId_;
- // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently.
+ // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently.
frameId = frameId_;
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock& l) {
+void Cluster::checkUpdateIn(Lock&) {
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ discarding = false; // ok to set, we're stalled for update.
QPID_LOG(info, *this << " received update, starting catch-up");
- unstall(l);
+ deliverEventQueue.start();
}
}
@@ -485,7 +509,7 @@ void Cluster::updateOutDone(Lock& l) {
assert(state == UPDATER);
state = READY;
mcast.release();
- unstall(l);
+ deliverEventQueue.start(); // Start processing events again.
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -569,15 +593,13 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
- // Generate a deliver-close control frame for connections
- // belonging to defunct members, so they will be erased in the
- // deliverFrameQueue thread.
- ConnectionMap::Vector c = connections.values();
- for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) {
- ConnectionId cid = (*i)->getId();
- MemberId mid = cid.getMember();
- if (mid != self && !map.isMember(mid))
- connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody())));
+ // Erase connections belonging to members that have left the cluster.
+ ConnectionMap::iterator i = connections.begin();
+ while (i != connections.end()) {
+ ConnectionMap::iterator j = i++;
+ MemberId m = j->second->getId().getMember();
+ if (m != self && !map.isMember(m))
+ connections.erase(j);
}
}