summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-16 17:07:26 +0000
committerAlan Conway <aconway@apache.org>2008-10-16 17:07:26 +0000
commitd39a165c9c8d1fa2fd728a2237117efa71848874 (patch)
treedd07b81f1f2d2de42ce2fdf28432130566a5622e /cpp/src/qpid/cluster/Connection.cpp
parentf7a4f7bcf77726767d0905f56f5c44c7a34d82a3 (diff)
downloadqpid-python-d39a165c9c8d1fa2fd728a2237117efa71848874.tar.gz
Fix race in cluster causing incorrect known-broker lists to be sent to clients.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705287 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp84
1 files changed, 51 insertions, 33 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ae731ed25e..2f1518f871 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -44,7 +44,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
: cluster(c), self(myId), catchUp(false), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId)
{
- QPID_LOG(debug, "New connection: " << *this);
+ QPID_LOG(debug, cluster << " new connection: " << *this);
}
// Local connections
@@ -53,11 +53,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId)
{
- QPID_LOG(debug, "New connection: " << *this);
+ QPID_LOG(debug, cluster << " new connection: " << *this);
}
Connection::~Connection() {
- QPID_LOG(debug, "Deleted connection: " << *this);
+ QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
bool Connection::doOutput() {
@@ -72,32 +72,36 @@ void Connection::deliverDoOutput(uint32_t requested) {
output.deliverDoOutput(requested);
}
+// FIXME aconway 2008-10-15: changes here, dubious.
+
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
- QPID_LOG(trace, "RECV " << *this << ": " << f);
- if (isShadow()) {
- // Intercept the close that completes catch-up for shadow a connection.
- if (isShadow() && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
- catchUp = false;
- cluster.insert(boost::intrusive_ptr<Connection>(this));
+ QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
+ if (isLocal()) {
+ currentChannel = f.getChannel();
+ if (!framing::invoke(*this, *f.getBody()).wasHandled())
+ connection.received(f);
+ }
+ else { // Shadow or dumped ex catch-up connection.
+ if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ if (isShadow()) {
+ QPID_LOG(debug, cluster << " inserting connection " << *this);
+ cluster.insert(boost::intrusive_ptr<Connection>(this));
+ }
AMQFrame ok(in_place<ConnectionCloseOkBody>());
connection.getOutput().send(ok);
output.setOutputHandler(discardHandler);
+ catchUp = false;
}
else
- QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
- }
- else {
- currentChannel = f.getChannel();
- if (!framing::invoke(*this, *f.getBody()).wasHandled())
- connection.received(f);
+ QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f);
}
}
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, "DLVR " << *this << ": " << f);
- assert(!isCatchUp());
+ QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+ assert(!catchUp);
// Handle connection controls, deliver other frames to connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -106,24 +110,25 @@ void Connection::delivered(framing::AMQFrame& f) {
void Connection::closed() {
try {
- QPID_LOG(debug, "Connection closed " << *this);
if (catchUp) {
- QPID_LOG(critical, cluster << " error on catch-up connection " << *this);
+ QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
cluster.leave();
}
- else if (isDump())
+ else if (isDumped()) {
+ QPID_LOG(debug, cluster << " closed dump connection " << *this);
connection.closed();
+ }
else if (isLocal()) {
+ QPID_LOG(debug, cluster << " local close of replicated connection " << *this);
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.setOutputHandler(discardHandler);
- cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
- ++mcastSeq;
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self, ++mcastSeq);
}
}
catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+ QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what());
}
}
@@ -135,7 +140,7 @@ void Connection::deliverClose () {
// Decode data from local clients.
size_t Connection::decode(const char* buffer, size_t size) {
- if (catchUp || isDump()) { // Handle catch-up locally.
+ if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
received(localDecoder.frame);
@@ -174,26 +179,39 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, "Received session state dump for " << s->getId());
+ QPID_LOG(debug, cluster << " received session state dump for " << s->getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
ConnectionId shadow = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
self = shadow;
}
-void Connection::membership(const FieldTable& urls, const FieldTable& states) {
- cluster.dumpInDone(ClusterMap(urls,states));
- catchUp = false;
- self.second = 0; // Mark this as completed dump connection.
+void Connection::membership(const FieldTable& newbies, const FieldTable& members) {
+ QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this);
+ cluster.dumpInDone(ClusterMap(newbies, members));
+ self.second = 0; // Mark this as completed dump connection.
}
-bool Connection::isLocal() const { return self.first == cluster.getId() && self.second == this; }
+bool Connection::isLocal() const {
+ return self.first == cluster.getId() && self.second == this;
+}
+
+bool Connection::isShadow() const {
+ return self.first != cluster.getId();
+}
+
+bool Connection::isDumped() const {
+ return self.first == cluster.getId() && self.second == 0;
+}
std::ostream& operator<<(std::ostream& o, const Connection& c) {
- return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")
- << (c.isCatchUp() ? ",catchup" : "") << ")";
+ const char* type="unknown";
+ if (c.isLocal()) type = "local";
+ else if (c.isShadow()) type = "shadow";
+ else if (c.isDumped()) type = "dumped";
+ return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
}} // namespace qpid::cluster