diff options
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 8 |
3 files changed, 27 insertions, 10 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 397bd4e39b..44819a88a2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -236,7 +236,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s = sessions[frame.getChannel()].lock(); } if (!s) { - QPID_LOG(info, "Dropping frame received on invalid channel: " << frame); + QPID_LOG(info, *this << " dropping frame received on invalid channel: " << frame); } else { s->in(frame); } @@ -252,7 +252,6 @@ void ConnectionImpl::open() const std::string& protocol = handler.protocol; const std::string& host = handler.host; int port = handler.port; - QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); theIO().add(); connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); @@ -267,6 +266,7 @@ void ConnectionImpl::open() throw; } connector->init(); + QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); // Enable heartbeat if requested uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat; @@ -291,10 +291,10 @@ void ConnectionImpl::open() //enable security layer if one has been negotiated: std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer(); if (securityLayer.get()) { - QPID_LOG(debug, "Activating security layer"); + QPID_LOG(debug, *this << " activating security layer"); connector->activateSecurityLayer(securityLayer); } else { - QPID_LOG(debug, "No security layer in place"); + QPID_LOG(debug, *this << " no security layer in place"); } } @@ -401,17 +401,20 @@ void ConnectionImpl::failedConnection() { bool isClosing = handler.isClosing(); bool isOpen = handler.isOpen(); + std::ostringstream msg; + msg << *this << " closed"; + // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have // an appropriate close-code. connection-forced is not right. - handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions + handler.fail(msg.str());//ensure connection is marked as failed before notifying sessions // At this point if the object isn't open and isn't closing it must have failed to open // so we can't do the rest of the cleanup if (!isClosing && !isOpen) return; Mutex::ScopedLock l(lock); - closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED)); - setException(new TransportFailure(CONN_CLOSED)); + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, msg.str())); + setException(new TransportFailure(msg.str())); } void ConnectionImpl::erase(uint16_t ch) { @@ -435,4 +438,12 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na return simpl; } +std::ostream& operator<<(std::ostream& o, const ConnectionImpl& c) { + if (c.connector) + return o << "Connection " << c.connector->getIdentifier(); + else + return o << "Connection <not connected>"; +} + + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 57d874b555..cc81500b18 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -31,6 +31,7 @@ #include "qpid/sys/TimeoutHandler.h" #include <map> +#include <iosfwd> #include <boost/shared_ptr.hpp> #include <boost/weak_ptr.hpp> #include <boost/scoped_ptr.hpp> @@ -95,8 +96,9 @@ class ConnectionImpl : public Bounds, std::vector<Url> getInitialBrokers(); void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; } - framing::ProtocolVersion getVersion() { return version; } + + friend std::ostream& operator<<(std::ostream&, const ConnectionImpl&); }; }} diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 7d85b4325b..119a6aa8a4 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -400,10 +400,14 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { } /* - * We keep on reading as long as we have something to read and a buffer to put - * it in + * We keep on reading as long as we have something to read, a buffer + * to put it in and reading is not stopped by flow control. */ void AsynchIO::readable(DispatchHandle& h) { + if (readingStopped) { + // We have been flow controlled. + return; + } int readTotal = 0; AbsTime readStartTime = AbsTime::now(); do { |