diff options
| author | Alan Conway <aconway@apache.org> | 2008-01-24 22:26:12 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-01-24 22:26:12 +0000 |
| commit | f0a31beb7a609591e7b34e60ddfd85e9e183fbc0 (patch) | |
| tree | 5582c3f04ee1b417d11050b0c994da657db09b39 /cpp/src/qpid/client | |
| parent | f2ab2fa9fcb713eedf21e98a2a3f9fab8e76dead (diff) | |
| download | qpid-python-f0a31beb7a609591e7b34e60ddfd85e9e183fbc0.tar.gz | |
Improved/additional client API tests.
- Replaced InProcessBroker with a more accurate loopback BrokerFixture.
- Added asserts for mutex/condition/thread errors in debug build.
- Added client tests for several exception conditions.
- Added peer address to log ouput, client/server distinguished by (addr) or [addr]
- Fixed various deadlocks & races exposed by the new asserts & tests.
File-by-file:
New BrokerFixture replaces InProcessBroker
D src/tests/InProcessBroker.h
M src/tests/BrokerFixture.h
M src/tests/SocketProxy.h
M src/tests/Makefile.am
Made it run a bit faster.
M src/tests/quick_perftest
Redundant
D src/tests/APRBaseTest.cpp
Updated tests to use BrokerFixture
M src/tests/ClientChannelTest.cpp
M src/tests/exception_test.cpp
M src/tests/ClientSessionTest.cpp
Print thread IDs in decimal, same as GDB.
M src/qpid/log/Logger.cpp
Assert mutex/condition ops in debug build.
M src/qpid/sys/posix/check.h
M src/qpid/sys/posix/Mutex.h
M src/qpid/sys/posix/Condition.h
M src/qpid/sys/posix/Thread.h
Added toFd() so SocketProxy can use ::select()
M src/qpid/sys/Socket.h
M src/qpid/sys/posix/Socket.cpp
Fixes for races & deadlocks shown up by new tests & asserts.
Mostly shutdown/close issues.
M src/qpid/client/ConnectionHandler.h
M src/qpid/client/ConnectionImpl.cpp
M src/qpid/client/Demux.h
M src/qpid/client/SessionCore.cpp
M src/qpid/client/ConnectionHandler.cpp
M src/qpid/client/Connector.h
M src/qpid/client/Demux.cpp
M src/qpid/client/Dispatcher.cpp
M src/qpid/client/ConnectionImpl.h
Logging peer address.
M src/qpid/sys/AsynchIOAcceptor.cpp
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@615063 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 84 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Demux.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Demux.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 13 |
10 files changed, 88 insertions, 58 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 84b0768c27..e1c50c14fc 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -142,7 +142,7 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody void ConnectionHandler::fail(const std::string& message) { - QPID_LOG(error, message); + QPID_LOG(warning, message); setState(FAILED); } diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index e409f0f2a9..bb50495c06 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -59,7 +59,6 @@ class ConnectionHandler : private StateManager, void send(const framing::AMQBody& body); void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); void error(uint16_t code, const std::string& message, framing::AMQBody* body); - void fail(const std::string& message); public: using InputHandler::handle; @@ -75,6 +74,7 @@ public: void waitForOpen(); void close(); + void fail(const std::string& message); CloseListener onClose; ErrorListener onError; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index dd986deec4..b248de8744 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/log/Statement.h" #include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" @@ -44,14 +45,18 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) connector->setShutdownHandler(this); } -ConnectionImpl::~ConnectionImpl() { close(); } +ConnectionImpl::~ConnectionImpl() { + // Important to close the connector first, to ensure the + // connector thread does not call on us while the destructor + // is running. + connector->close(); +} void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) { Mutex::ScopedLock l(lock); boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()]; - if (s.lock()) - throw ChannelBusyException(); + if (s.lock()) throw ChannelBusyException(); s = session; } @@ -81,31 +86,15 @@ void ConnectionImpl::open(const std::string& host, int port, handler.pwd = pwd; handler.vhost = vhost; + QPID_LOG(info, "Connecting to " << host << ":" << port); connector->connect(host, port); connector->init(); handler.waitForOpen(); } -bool ConnectionImpl::setClosing() -{ - Mutex::ScopedLock l(lock); - if (isClosing || isClosed) { - return false; - } - isClosing = true; - return true; -} - -void ConnectionImpl::close() -{ - if (setClosing()) { - handler.close(); - } -} - void ConnectionImpl::idleIn() { - connector->close(); + close(); } void ConnectionImpl::idleOut() @@ -114,35 +103,52 @@ void ConnectionImpl::idleOut() connector->send(frame); } -template <class F> -void ConnectionImpl::forChannels(F functor) { - for (SessionMap::iterator i = sessions.begin(); - i != sessions.end(); ++i) { - try { - boost::shared_ptr<SessionCore> s = i->second.lock(); - if (s) functor(*s); - } catch (...) { assert(0); } +void ConnectionImpl::close() +{ + Mutex::ScopedLock l(lock); + if (isClosing || isClosed) return; + isClosing = true; + { + Mutex::ScopedUnlock u(lock); + handler.close(); + } + closed(REPLY_SUCCESS, "Closed by client"); +} + +// Set closed flags and erase the sessions map, but keep the contents +// so sessions can be updated outside the lock. +ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + isClosed = true; + connector->close(); + SessionVector save; + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + boost::shared_ptr<SessionCore> s = i->second.lock(); + if (s) save.push_back(s); } + sessions.clear(); + return save; } -void ConnectionImpl::shutdown() +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionBroke, _1, - INTERNAL_ERROR, "Unexpected socket closure.")); - sessions.clear(); - isClosed = true; + SessionVector save(closeInternal(l)); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text)); } -void ConnectionImpl::closed(uint16_t code, const std::string& text) +static const std::string CONN_CLOSED("Connection closed by broker"); + +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); - sessions.clear(); - isClosed = true; - connector->close(); + SessionVector save(closeInternal(l)); + handler.fail(CONN_CLOSED); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), + boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 1fe8ac4653..bf8226a776 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -43,6 +43,8 @@ class ConnectionImpl : public framing::FrameHandler, { typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap; + typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector; + SessionMap sessions; ConnectionHandler handler; boost::shared_ptr<Connector> connector; @@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler, bool isClosed; bool isClosing; + template <class F> void detachAll(const F&); + + SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); void idleOut(); @@ -58,8 +63,6 @@ class ConnectionImpl : public framing::FrameHandler, void shutdown(); bool setClosing(); - template <class F> void forChannels(F functor); - public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 95314dcb40..4fb5aa6b4d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -27,7 +27,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" - +#include "qpid/Msg.h" #include <boost/bind.hpp> namespace qpid { @@ -43,6 +43,7 @@ Connector::Connector( send_buffer_size(buffer_size), version(ver), closed(true), + joined(true), timeout(0), idleIn(0), idleOut(0), timeoutHandler(0), @@ -52,11 +53,11 @@ Connector::Connector( Connector::~Connector() { close(); - if (receiver.id() && receiver.id() != Thread::current().id()) - receiver.join(); } void Connector::connect(const std::string& host, int port){ + Mutex::ScopedLock l(closedLock); + assert(closed); socket.connect(host, port); closed = false; poller = Poller::shared_ptr(new Poller); @@ -71,20 +72,27 @@ void Connector::connect(const std::string& host, int port){ } void Connector::init(){ + Mutex::ScopedLock l(closedLock); + assert(joined); ProtocolInitiation init(version); - writeDataBlock(init); + joined = false; receiver = Thread(this); } bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); + bool ret = !closed; if (!closed) { - poller->shutdown(); closed = true; - return true; + poller->shutdown(); + } + if (!joined && receiver.id() != Thread::current().id()) { + joined = true; + Mutex::ScopedUnlock u(closedLock); + receiver.join(); } - return false; + return ret; } void Connector::close() { diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index aefd91f6f4..121a1c33aa 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -77,8 +77,9 @@ class Connector : public framing::OutputHandler, const int send_buffer_size; framing::ProtocolVersion version; - bool closed; sys::Mutex closedLock; + bool closed; + bool joined; sys::AbsTime lastIn; sys::AbsTime lastOut; @@ -112,6 +113,8 @@ class Connector : public framing::OutputHandler, void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); + + std::string identifier; friend class Channel; @@ -130,6 +133,7 @@ class Connector : public framing::OutputHandler, virtual void send(framing::AMQFrame& frame); virtual void setReadTimeout(uint16_t timeout); virtual void setWriteTimeout(uint16_t timeout); + const std::string& getIdentifier() const { return identifier; } }; }} diff --git a/cpp/src/qpid/client/Demux.cpp b/cpp/src/qpid/client/Demux.cpp index e61103981b..cb9372cee7 100644 --- a/cpp/src/qpid/client/Demux.cpp +++ b/cpp/src/qpid/client/Demux.cpp @@ -45,6 +45,10 @@ ScopedDivert::~ScopedDivert() demuxer.remove(dest); } +Demux::Demux() : defaultQueue(new Queue()) {} + +Demux::~Demux() { close(); } + Demux::QueuePtr ScopedDivert::getQueue() { return queue; diff --git a/cpp/src/qpid/client/Demux.h b/cpp/src/qpid/client/Demux.h index 234282a8d2..dce24223f2 100644 --- a/cpp/src/qpid/client/Demux.h +++ b/cpp/src/qpid/client/Demux.h @@ -47,7 +47,8 @@ public: typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue; typedef boost::shared_ptr<Queue> QueuePtr; - Demux() : defaultQueue(new Queue()) {} + Demux(); + ~Demux(); void handle(framing::FrameSet::shared_ptr); void close(); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index f4a7ff54d8..0783d5bc55 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -62,13 +62,12 @@ void Dispatcher::start() } void Dispatcher::run() -{ +{ Mutex::ScopedLock l(lock); if (running) throw Exception("Dispatcher is already running."); boost::state_saver<bool> reset(running); // Reset to false on exit. running = true; - queue->open(); try { while (!queue->isClosed()) { Mutex::ScopedUnlock u(lock); diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 07a791bef3..5079c47b5e 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -125,7 +125,7 @@ void SessionCore::detach(int c, const std::string& t) { channel.next = 0; code=c; text=t; - l3.getDemux().close(); + l3.getDemux().close(); } void SessionCore::doClose(int code, const std::string& text) { @@ -270,7 +270,6 @@ void SessionCore::detached() { // network thread Lock l(state); check(state == SUSPENDING, COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED); - connection->erase(channel); doSuspend(REPLY_SUCCESS, OK); } @@ -379,22 +378,28 @@ bool isCloseResponse(const AMQFrame& frame) { // Network thread. void SessionCore::handleIn(AMQFrame& frame) { + ConnectionImpl::shared_ptr save; { Lock l(state); + save=connection; // Ignore frames received while closing other than closed response. if (state==CLOSING && !isCloseResponse(frame)) return; } try { // Cast to expose private SessionHandler functions. - if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + // If we were detached by a session command, tell the connection. + if (!connection) save->erase(channel); + } + else { session->received(frame); l3.handle(frame); } } catch (const ChannelException& e) { QPID_LOG(error, "Channel exception:" << e.what()); doClose(e.code, e.what()); - } + } } void SessionCore::handleOut(AMQFrame& frame) |
