diff options
| author | Alan Conway <aconway@apache.org> | 2008-09-26 21:49:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-09-26 21:49:52 +0000 |
| commit | 47b7d230566810cd84446859b63885329186e943 (patch) | |
| tree | 121e22b2c8c84f27dee0c2b5aff6542075ea99a8 /cpp/src/qpid/cluster/Connection.cpp | |
| parent | 41b2637e20345f264cff40be2885729c315e5828 (diff) | |
| download | qpid-python-47b7d230566810cd84446859b63885329186e943.tar.gz | |
Clean up end-of-dump protocol for new cluster members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@699513 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 30 |
1 files changed, 9 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 21eec0f86e..6aab31c177 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -76,10 +76,13 @@ void Connection::deliverDoOutput(uint32_t requested) { void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, "RECV " << *this << ": " << f); if (isShadow()) { - // Final close that completes catch-up for shadow connection. + // Intercept the close that completes catch-up for shadow a connection. if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { + catchUp = false; AMQFrame ok(in_place<ConnectionCloseOkBody>()); + cluster.insert(boost::intrusive_ptr<Connection>(this)); connection.getOutput().send(ok); + output.setOutputHandler(discardHandler); } else QPID_LOG(warning, *this << " ignoring unexpected frame: " << f); @@ -104,27 +107,13 @@ void Connection::delivered(framing::AMQFrame& f) { void Connection::closed() { try { QPID_LOG(debug, "Connection closed " << *this); - - if (catchUp) { - cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); - if (isShadow()) - catchUp = false; - else { - connection.closed(); - return; - } - } - - // Local network connection has closed. We need to keep the - // connection around but replace the output handler with a - // no-op handler as the network output handler will be - // deleted. - output.setOutputHandler(discardHandler); - - if (isLocal() && !catchUp) { + if (catchUp) + connection.closed(); + else if (isLocal()) { // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. + output.setOutputHandler(discardHandler); cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); ++mcastSeq; } @@ -188,11 +177,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { ConnectionId shadow = ConnectionId(memberId, connectionId); QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow); self = shadow; - assert(isShadow()); } void Connection::dumpComplete() { - // FIXME aconway 2008-09-18: use or remove. + cluster.dumpComplete(); } bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } |
