diff options
| author | Alan Conway <aconway@apache.org> | 2009-03-08 23:52:35 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-03-08 23:52:35 +0000 |
| commit | bab8070ad7989386b11f4106d9f15e73d9246c1d (patch) | |
| tree | dfca93bc4ca56de921c01f87e966855816744484 /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | e14ed937f459d07735a5ed22636127fdf81dc88c (diff) | |
| download | qpid-python-bab8070ad7989386b11f4106d9f15e73d9246c1d.tar.gz | |
Fixed race conditions in cluster.
Execute all cluster logic in frameDeliverQueue thread,
decoding only in eventDeliverQueue thread.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 178 |
1 files changed, 100 insertions, 78 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8946a71446..169d0fb1af 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -107,11 +107,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), - connections(*this), - frameId(0), initialized(false), + decoder(boost::bind(&Cluster::deliverFrame, this, _1)), + discarding(true), state(INIT), - eventId(0), + frameId(0), lastSize(0), lastBroker(false) { @@ -156,14 +156,19 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - connections.insert(c); + localConnections.insert(c); } // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - connections.insert(c); + // Safe to use connections here because we're pre-catchup, either + // discarding or stalled, so deliveredFrame is not processing any + // connection events. + assert(discarding); + connections.insert(ConnectionMap::value_type(c->getId(), c)); } +// Called by Connection::deliverClose() in deliverFrameQueue thread. void Cluster::erase(const ConnectionId& id) { connections.erase(id); } @@ -195,7 +200,6 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); @@ -217,58 +221,89 @@ void Cluster::deliver( Event e(Event::decodeCopy(from, buf)); if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); - deliver(e); + deliverEvent(e); } -void Cluster::deliver(const Event& e) { +void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } +void Cluster::deliverFrame(const EventFrame& e) { + deliverFrameQueue.push(e); +} + // Handler for deliverEventQueue. -// This thread executes cluster controls and decodes connection data events. -void Cluster::deliveredEvent(const Event& event) { - Event e(event); - Mutex::ScopedLock l(lock); - if (state >= CATCHUP) { - e.setId(++eventId); +// This thread decodes frames from events. +void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DLVR: " << e); - } - if (e.isCluster()) { // Cluster control, process in this thread. + if (e.isCluster()) { EventFrame ef(e, e.getFrame()); - QPID_LOG(trace, *this << " DLVR: " << ef); - ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); - if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); + // Stop the deliverEventQueue on update offers. + // This preserves the connection decoder fragments for an update. + ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody()); + if (offer) + deliverEventQueue.stop(); + deliverFrame(ef); } - else if (state >= CATCHUP) { // Handle connection frames - if (e.getType() == CONTROL) - connectionFrame(EventFrame(e, e.getFrame())); + else if(!discarding) { + if (e.isControl()) + deliverFrame(EventFrame(e, e.getFrame())); else - connections.decode(e, e.getData()); - } - // Drop connection frames while state < CATCHUP + decoder.decode(e, e.getData()); } - -void Cluster::connectionFrame(const EventFrame& frame) { - deliverFrameQueue.push(frame); + else // Discard connection events if discarding is set. + QPID_LOG(trace, *this << " DROP: " << e); } // Handler for deliverFrameQueue. -// This thread executes connection control and data frames. -void Cluster::deliveredFrame(const EventFrame& event) { - // No lock, only use connections, not Cluster state. - EventFrame e(event); - if(!e.frame.getBody()) { // marks the stall point, start the update task. - updateThread=Thread(*updateTask); +// This thread executes the main logic. +void Cluster::deliveredFrame(const EventFrame& e) { + Mutex::ScopedLock l(lock); + if (e.isCluster()) { + QPID_LOG(trace, *this << " DLVR: " << e); + ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); + if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); } - else { + else if (state >= CATCHUP) { QPID_LOG(trace, *this << " DLVR: " << e); - if (e.type == DATA) // Add cluster-id to to data frames. - e.frame.setClusterId(frameId++); - boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); + EventFrame ef(e); // Non-const copy + if (ef.type == DATA) // Add cluster-id to to data frames. + ef.frame.setClusterId(frameId++); + ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); } + else // Drop connection frames while state < CATCHUP + QPID_LOG(trace, *this << " DROP: " << e); +} + +// Called in deliverFrameQueue thread +ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) { + ConnectionPtr cp; + ConnectionMap::iterator i = connections.find(id); + if (i != connections.end()) + cp = i->second; + else { + if(id.getMember() == self) + cp = localConnections.getErase(id); + else { + // New remote connection, create a shadow. + std::ostringstream mgmtId; + mgmtId << id; + cp = new Connection(*this, shadowOut, mgmtId.str(), id); + } + if (cp) + connections.insert(ConnectionMap::value_type(id, cp)); + } + return cp; +} + +Cluster::ConnectionVector Cluster::getConnections(Lock&) { + ConnectionVector result(connections.size()); + std::transform(connections.begin(), connections.end(), result.begin(), + boost::bind(&ConnectionMap::value_type::second, _1)); + return result; } struct AddrList { @@ -316,7 +351,7 @@ void Cluster::configChange ( std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); + deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); } void Cluster::setReady(Lock&) { @@ -337,6 +372,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { setClusterId(true, l); + discarding = false; setReady(l); map = ClusterMap(self, myUrl, true); memberUpdate(l); @@ -396,28 +432,18 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::stall(Lock&) { - // Stop processing the deliveredEventQueue in order to send or - // recieve an update. - deliverEventQueue.stop(); -} - -void Cluster::unstall(Lock&) { - // Stop processing the deliveredEventQueue in order to send or - // recieve an update. - deliverEventQueue.start(); -} - void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { + // NOTE: deliverEventQueue has been stopped at the update offer by + // deliveredEvent in case an update is required. if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); - if (url) { // My offer was first. + if (url) // My offer was first. updateStart(updatee, *url, l); - } else { // Another offer was first. + deliverEventQueue.start(); // Don't need to update setReady(l); QPID_LOG(info, *this << " cancelled update offer to " << updatee); makeOffer(map.firstJoiner(), l); // Maybe make another offer. @@ -428,50 +454,48 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu setClusterId(uuid, l); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - stall(l); checkUpdateIn(l); } + else + deliverEventQueue.start(); // Don't need to update } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { + // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. if (state == LEFT) return; assert(state == OFFER); state = UPDATER; - QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - stall(l); - + QPID_LOG(info, *this << " sending update to " << updatee << " at " << url); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; - updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(), + updateThread = Thread( + new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder, boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), - cs); - // Push an empty frame onto the deliverFrameQueue to mark the stall point. - // The deliverFrameQueue thread will start the update at that point. - deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame())); + cs)); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) { Lock l(lock); updatedMap = m; - eventId = eventId_; - // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently. + // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently. frameId = frameId_; checkUpdateIn(l); } -void Cluster::checkUpdateIn(Lock& l) { +void Cluster::checkUpdateIn(Lock&) { if (state == UPDATEE && updatedMap) { map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; + discarding = false; // ok to set, we're stalled for update. QPID_LOG(info, *this << " received update, starting catch-up"); - unstall(l); + deliverEventQueue.start(); } } @@ -485,7 +509,7 @@ void Cluster::updateOutDone(Lock& l) { assert(state == UPDATER); state = READY; mcast.release(); - unstall(l); + deliverEventQueue.start(); // Start processing events again. makeOffer(map.firstJoiner(), l); // Try another offer } @@ -569,15 +593,13 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Generate a deliver-close control frame for connections - // belonging to defunct members, so they will be erased in the - // deliverFrameQueue thread. - ConnectionMap::Vector c = connections.values(); - for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) { - ConnectionId cid = (*i)->getId(); - MemberId mid = cid.getMember(); - if (mid != self && !map.isMember(mid)) - connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody()))); + // Erase connections belonging to members that have left the cluster. + ConnectionMap::iterator i = connections.begin(); + while (i != connections.end()) { + ConnectionMap::iterator j = i++; + MemberId m = j->second->getId().getMember(); + if (m != self && !map.isMember(m)) + connections.erase(j); } } |
