From 1a630dfdb95d2fe4bba7aedde6fb162a3f1af40e Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 14 Jul 2010 21:33:09 +0000 Subject: Fix read-credit bug causing cluster brokers to disconnect clients sporadically. Also added connection identifier in connection log messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@964213 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 25 ++++++++++++++++++------- qpid/cpp/src/qpid/client/ConnectionImpl.h | 4 +++- qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 8 ++++++-- 3 files changed, 27 insertions(+), 10 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 397bd4e39b..44819a88a2 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -236,7 +236,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s = sessions[frame.getChannel()].lock(); } if (!s) { - QPID_LOG(info, "Dropping frame received on invalid channel: " << frame); + QPID_LOG(info, *this << " dropping frame received on invalid channel: " << frame); } else { s->in(frame); } @@ -252,7 +252,6 @@ void ConnectionImpl::open() const std::string& protocol = handler.protocol; const std::string& host = handler.host; int port = handler.port; - QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); theIO().add(); connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); @@ -267,6 +266,7 @@ void ConnectionImpl::open() throw; } connector->init(); + QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); // Enable heartbeat if requested uint16_t heartbeat = static_cast(handler).heartbeat; @@ -291,10 +291,10 @@ void ConnectionImpl::open() //enable security layer if one has been negotiated: std::auto_ptr securityLayer = handler.getSecurityLayer(); if (securityLayer.get()) { - QPID_LOG(debug, "Activating security layer"); + QPID_LOG(debug, *this << " activating security layer"); connector->activateSecurityLayer(securityLayer); } else { - QPID_LOG(debug, "No security layer in place"); + QPID_LOG(debug, *this << " no security layer in place"); } } @@ -401,17 +401,20 @@ void ConnectionImpl::failedConnection() { bool isClosing = handler.isClosing(); bool isOpen = handler.isOpen(); + std::ostringstream msg; + msg << *this << " closed"; + // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have // an appropriate close-code. connection-forced is not right. - handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions + handler.fail(msg.str());//ensure connection is marked as failed before notifying sessions // At this point if the object isn't open and isn't closing it must have failed to open // so we can't do the rest of the cleanup if (!isClosing && !isOpen) return; Mutex::ScopedLock l(lock); - closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED)); - setException(new TransportFailure(CONN_CLOSED)); + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, msg.str())); + setException(new TransportFailure(msg.str())); } void ConnectionImpl::erase(uint16_t ch) { @@ -435,4 +438,12 @@ boost::shared_ptr ConnectionImpl::newSession(const std::string& na return simpl; } +std::ostream& operator<<(std::ostream& o, const ConnectionImpl& c) { + if (c.connector) + return o << "Connection " << c.connector->getIdentifier(); + else + return o << "Connection "; +} + + }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h index 57d874b555..cc81500b18 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h @@ -31,6 +31,7 @@ #include "qpid/sys/TimeoutHandler.h" #include +#include #include #include #include @@ -95,8 +96,9 @@ class ConnectionImpl : public Bounds, std::vector getInitialBrokers(); void registerFailureCallback ( boost::function fn ) { failureCallback = fn; } - framing::ProtocolVersion getVersion() { return version; } + + friend std::ostream& operator<<(std::ostream&, const ConnectionImpl&); }; }} diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 7d85b4325b..119a6aa8a4 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -400,10 +400,14 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { } /* - * We keep on reading as long as we have something to read and a buffer to put - * it in + * We keep on reading as long as we have something to read, a buffer + * to put it in and reading is not stopped by flow control. */ void AsynchIO::readable(DispatchHandle& h) { + if (readingStopped) { + // We have been flow controlled. + return; + } int readTotal = 0; AbsTime readStartTime = AbsTime::now(); do { -- cgit v1.2.1