summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-26 21:49:52 +0000
committerAlan Conway <aconway@apache.org>2008-09-26 21:49:52 +0000
commit47b7d230566810cd84446859b63885329186e943 (patch)
tree121e22b2c8c84f27dee0c2b5aff6542075ea99a8 /cpp/src/qpid/cluster/Connection.cpp
parent41b2637e20345f264cff40be2885729c315e5828 (diff)
downloadqpid-python-47b7d230566810cd84446859b63885329186e943.tar.gz
Clean up end-of-dump protocol for new cluster members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@699513 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp30
1 files changed, 9 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 21eec0f86e..6aab31c177 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -76,10 +76,13 @@ void Connection::deliverDoOutput(uint32_t requested) {
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, "RECV " << *this << ": " << f);
if (isShadow()) {
- // Final close that completes catch-up for shadow connection.
+ // Intercept the close that completes catch-up for shadow a connection.
if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ catchUp = false;
AMQFrame ok(in_place<ConnectionCloseOkBody>());
+ cluster.insert(boost::intrusive_ptr<Connection>(this));
connection.getOutput().send(ok);
+ output.setOutputHandler(discardHandler);
}
else
QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
@@ -104,27 +107,13 @@ void Connection::delivered(framing::AMQFrame& f) {
void Connection::closed() {
try {
QPID_LOG(debug, "Connection closed " << *this);
-
- if (catchUp) {
- cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
- if (isShadow())
- catchUp = false;
- else {
- connection.closed();
- return;
- }
- }
-
- // Local network connection has closed. We need to keep the
- // connection around but replace the output handler with a
- // no-op handler as the network output handler will be
- // deleted.
- output.setOutputHandler(discardHandler);
-
- if (isLocal() && !catchUp) {
+ if (catchUp)
+ connection.closed();
+ else if (isLocal()) {
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
+ output.setOutputHandler(discardHandler);
cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
++mcastSeq;
}
@@ -188,11 +177,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
ConnectionId shadow = ConnectionId(memberId, connectionId);
QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow);
self = shadow;
- assert(isShadow());
}
void Connection::dumpComplete() {
- // FIXME aconway 2008-09-18: use or remove.
+ cluster.dumpComplete();
}
bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }