From 729e9ce65125154cfdd2877abc8f7a901ad7caa2 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 3 Feb 2009 21:28:14 +0000 Subject: Fix for race conditions in cluster join. - ConnectionDecoder: separated from Connection. - cluster/PollableQueue: stop processing frames if PollableQueue is stopped. - move state checks in event-queue handler to frame-queue handler. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740459 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Connection.cpp | 33 ++++++++------------------------- 1 file changed, 8 insertions(+), 25 deletions(-) (limited to 'cpp/src/qpid/cluster/Connection.cpp') diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9016e812be..a71950ef1d 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -75,7 +75,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocal() && !isCatchUp() && cluster.getReadMax()) { + if (isLocalClient()) { + cluster.addLocalConnection(this); + if (cluster.getReadMax()) output.giveReadCredit(cluster.getReadMax()); } } @@ -99,17 +101,15 @@ void Connection::deliverDoOutput(uint32_t requested) { // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); - if (isLocal()) { + if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } - else { // Shadow or updated ex catch-up connection. + else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA()) { - if (isShadow()) { - QPID_LOG(debug, cluster << " inserting connection " << *this); - cluster.insert(boost::intrusive_ptr(this)); - } + if (isShadow()) + cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); @@ -136,24 +136,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { return !message.empty(); } -// Decode buffer and put frames on frameq. -void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { - assert(!catchUp); - Buffer buf(e); - // Set read credit on the last frame. - ++readCredit; // One credit per buffer. - if (!mcastDecoder.decode(buf)) return; - AMQFrame frame(mcastDecoder.frame); - while (mcastDecoder.decode(buf)) { - frameq.push(EventFrame(this, e, frame)); - frame = mcastDecoder.frame; - } - frameq.push(EventFrame(this, e, frame, readCredit)); - readCredit = 0; -} - - -// Delivered from cluster. +// Called in delivery thread, in cluster order. void Connection::deliveredFrame(const EventFrame& f) { assert(!catchUp); currentChannel = f.frame.getChannel(); -- cgit v1.2.1