diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 6aab31c177..ae731ed25e 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -77,10 +77,10 @@ void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, "RECV " << *this << ": " << f); if (isShadow()) { // Intercept the close that completes catch-up for shadow a connection. - if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { + if (isShadow() && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { catchUp = false; - AMQFrame ok(in_place<ConnectionCloseOkBody>()); cluster.insert(boost::intrusive_ptr<Connection>(this)); + AMQFrame ok(in_place<ConnectionCloseOkBody>()); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); } @@ -107,7 +107,11 @@ void Connection::delivered(framing::AMQFrame& f) { void Connection::closed() { try { QPID_LOG(debug, "Connection closed " << *this); - if (catchUp) + if (catchUp) { + QPID_LOG(critical, cluster << " error on catch-up connection " << *this); + cluster.leave(); + } + else if (isDump()) connection.closed(); else if (isLocal()) { // This was a local replicated connection. Multicast a deliver @@ -131,7 +135,7 @@ void Connection::deliverClose () { // Decode data from local clients. size_t Connection::decode(const char* buffer, size_t size) { - if (catchUp) { // Handle catch-up locally. + if (catchUp || isDump()) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) received(localDecoder.frame); @@ -179,11 +183,13 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::dumpComplete() { - cluster.dumpComplete(); +void Connection::membership(const FieldTable& urls, const FieldTable& states) { + cluster.dumpInDone(ClusterMap(urls,states)); + catchUp = false; + self.second = 0; // Mark this as completed dump connection. } -bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } +bool Connection::isLocal() const { return self.first == cluster.getId() && self.second == this; } std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow") |
