diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-16 17:07:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-16 17:07:26 +0000 |
| commit | d39a165c9c8d1fa2fd728a2237117efa71848874 (patch) | |
| tree | dd07b81f1f2d2de42ce2fdf28432130566a5622e /cpp/src/qpid/cluster/Connection.cpp | |
| parent | f7a4f7bcf77726767d0905f56f5c44c7a34d82a3 (diff) | |
| download | qpid-python-d39a165c9c8d1fa2fd728a2237117efa71848874.tar.gz | |
Fix race in cluster causing incorrect known-broker lists to be sent to clients.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705287 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 84 |
1 files changed, 51 insertions, 33 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ae731ed25e..2f1518f871 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -44,7 +44,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, : cluster(c), self(myId), catchUp(false), output(*this, out), connection(&output, cluster.getBroker(), wrappedId) { - QPID_LOG(debug, "New connection: " << *this); + QPID_LOG(debug, cluster << " new connection: " << *this); } // Local connections @@ -53,11 +53,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), connection(&output, cluster.getBroker(), wrappedId) { - QPID_LOG(debug, "New connection: " << *this); + QPID_LOG(debug, cluster << " new connection: " << *this); } Connection::~Connection() { - QPID_LOG(debug, "Deleted connection: " << *this); + QPID_LOG(debug, cluster << " deleted connection: " << *this); } bool Connection::doOutput() { @@ -72,32 +72,36 @@ void Connection::deliverDoOutput(uint32_t requested) { output.deliverDoOutput(requested); } +// FIXME aconway 2008-10-15: changes here, dubious. + // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { - QPID_LOG(trace, "RECV " << *this << ": " << f); - if (isShadow()) { - // Intercept the close that completes catch-up for shadow a connection. - if (isShadow() && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - catchUp = false; - cluster.insert(boost::intrusive_ptr<Connection>(this)); + QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); + if (isLocal()) { + currentChannel = f.getChannel(); + if (!framing::invoke(*this, *f.getBody()).wasHandled()) + connection.received(f); + } + else { // Shadow or dumped ex catch-up connection. + if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { + if (isShadow()) { + QPID_LOG(debug, cluster << " inserting connection " << *this); + cluster.insert(boost::intrusive_ptr<Connection>(this)); + } AMQFrame ok(in_place<ConnectionCloseOkBody>()); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); + catchUp = false; } else - QPID_LOG(warning, *this << " ignoring unexpected frame: " << f); - } - else { - currentChannel = f.getChannel(); - if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection.received(f); + QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f); } } // Delivered from cluster. void Connection::delivered(framing::AMQFrame& f) { - QPID_LOG(trace, "DLVR " << *this << ": " << f); - assert(!isCatchUp()); + QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f); + assert(!catchUp); // Handle connection controls, deliver other frames to connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) @@ -106,24 +110,25 @@ void Connection::delivered(framing::AMQFrame& f) { void Connection::closed() { try { - QPID_LOG(debug, "Connection closed " << *this); if (catchUp) { - QPID_LOG(critical, cluster << " error on catch-up connection " << *this); + QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); cluster.leave(); } - else if (isDump()) + else if (isDumped()) { + QPID_LOG(debug, cluster << " closed dump connection " << *this); connection.closed(); + } else if (isLocal()) { + QPID_LOG(debug, cluster << " local close of replicated connection " << *this); // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.setOutputHandler(discardHandler); - cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); - ++mcastSeq; + cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self, ++mcastSeq); } } catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); + QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what()); } } @@ -135,7 +140,7 @@ void Connection::deliverClose () { // Decode data from local clients. size_t Connection::decode(const char* buffer, size_t size) { - if (catchUp || isDump()) { // Handle catch-up locally. + if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) received(localDecoder.frame); @@ -174,26 +179,39 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, "Received session state dump for " << s->getId()); + QPID_LOG(debug, cluster << " received session state dump for " << s->getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { ConnectionId shadow = ConnectionId(memberId, connectionId); - QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow); + QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); self = shadow; } -void Connection::membership(const FieldTable& urls, const FieldTable& states) { - cluster.dumpInDone(ClusterMap(urls,states)); - catchUp = false; - self.second = 0; // Mark this as completed dump connection. +void Connection::membership(const FieldTable& newbies, const FieldTable& members) { + QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this); + cluster.dumpInDone(ClusterMap(newbies, members)); + self.second = 0; // Mark this as completed dump connection. } -bool Connection::isLocal() const { return self.first == cluster.getId() && self.second == this; } +bool Connection::isLocal() const { + return self.first == cluster.getId() && self.second == this; +} + +bool Connection::isShadow() const { + return self.first != cluster.getId(); +} + +bool Connection::isDumped() const { + return self.first == cluster.getId() && self.second == 0; +} std::ostream& operator<<(std::ostream& o, const Connection& c) { - return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow") - << (c.isCatchUp() ? ",catchup" : "") << ")"; + const char* type="unknown"; + if (c.isLocal()) type = "local"; + else if (c.isShadow()) type = "shadow"; + else if (c.isDumped()) type = "dumped"; + return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } }} // namespace qpid::cluster |
