summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp20
1 files changed, 13 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 6aab31c177..ae731ed25e 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -77,10 +77,10 @@ void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, "RECV " << *this << ": " << f);
if (isShadow()) {
// Intercept the close that completes catch-up for shadow a connection.
- if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ if (isShadow() && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
catchUp = false;
- AMQFrame ok(in_place<ConnectionCloseOkBody>());
cluster.insert(boost::intrusive_ptr<Connection>(this));
+ AMQFrame ok(in_place<ConnectionCloseOkBody>());
connection.getOutput().send(ok);
output.setOutputHandler(discardHandler);
}
@@ -107,7 +107,11 @@ void Connection::delivered(framing::AMQFrame& f) {
void Connection::closed() {
try {
QPID_LOG(debug, "Connection closed " << *this);
- if (catchUp)
+ if (catchUp) {
+ QPID_LOG(critical, cluster << " error on catch-up connection " << *this);
+ cluster.leave();
+ }
+ else if (isDump())
connection.closed();
else if (isLocal()) {
// This was a local replicated connection. Multicast a deliver
@@ -131,7 +135,7 @@ void Connection::deliverClose () {
// Decode data from local clients.
size_t Connection::decode(const char* buffer, size_t size) {
- if (catchUp) { // Handle catch-up locally.
+ if (catchUp || isDump()) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
received(localDecoder.frame);
@@ -179,11 +183,13 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
self = shadow;
}
-void Connection::dumpComplete() {
- cluster.dumpComplete();
+void Connection::membership(const FieldTable& urls, const FieldTable& states) {
+ cluster.dumpInDone(ClusterMap(urls,states));
+ catchUp = false;
+ self.second = 0; // Mark this as completed dump connection.
}
-bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
+bool Connection::isLocal() const { return self.first == cluster.getId() && self.second == this; }
std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")