From 7bbd0fdd577b167127633a7b52fe7ea487b1f267 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Mon, 7 Dec 2009 15:42:14 +0000 Subject: QPID-2214: Opening and closing client connections causes memory use to grow unboundedly - Clean up the DeletionManager state for each thread when the thread exits git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@887956 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/DeletionManager.h | 40 +++++++++++++++++++++++---- cpp/src/qpid/sys/epoll/EpollPoller.cpp | 50 ++++++++++++++++++++-------------- 2 files changed, 63 insertions(+), 27 deletions(-) (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/sys/DeletionManager.h b/cpp/src/qpid/sys/DeletionManager.h index 43154eb98e..5d8428966e 100644 --- a/cpp/src/qpid/sys/DeletionManager.h +++ b/cpp/src/qpid/sys/DeletionManager.h @@ -54,6 +54,8 @@ struct deleter template class DeletionManager { + struct ThreadStatus; + public: // Mark every thread as using the handle - it will be deleted // below after every thread marks the handle as unused @@ -65,6 +67,28 @@ public: // handles get deleted here when no one else // is using them either static void markAllUnusedInThisThread() { + ThreadStatus* threadStatus = getThreadStatus(); + ScopedLock l(threadStatus->lock); + + // The actual deletions will happen here when all the shared_ptr + // ref counts hit 0 (that is when every thread marks the handle unused) + threadStatus->handles.clear(); + } + + static void destroyThreadState() { + ThreadStatus* threadStatus = getThreadStatus(); + { + ScopedLock l(threadStatus->lock); + + allThreadsStatuses.delThreadStatus(threadStatus); + } + delete threadStatus; + threadStatus = 0; + } + +private: + + static ThreadStatus*& getThreadStatus() { static __thread ThreadStatus* threadStatus = 0; // Thread local vars can't be dynamically constructed so we need @@ -75,14 +99,9 @@ public: allThreadsStatuses.addThreadStatus(threadStatus); } - ScopedLock l(threadStatus->lock); - - // The actual deletions will happen here when all the shared_ptr - // ref counts hit 0 (that is when every thread marks the handle unused) - threadStatus->handles.clear(); + return threadStatus; } -private: typedef boost::shared_ptr shared_ptr; // In theory we know that we never need more handles than the number of @@ -125,6 +144,15 @@ private: statuses.push_back(t); } + void delThreadStatus(ThreadStatus* t) { + ScopedLock l(lock); + typename std::vector::iterator it = + std::find(statuses.begin(),statuses.end(), t); + if (it != statuses.end()) { + statuses.erase(it); + } + } + void addHandle(shared_ptr h) { ScopedLock l(lock); std::for_each(statuses.begin(), statuses.end(), handleAdder(h)); diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index fd9a4b3468..d7f64f3b4c 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -25,6 +25,7 @@ #include "qpid/sys/DeletionManager.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/log/Statement.h" #include #include @@ -467,28 +468,35 @@ bool Poller::interrupt(PollerHandle& handle) { } void Poller::run() { - // Make sure we can't be interrupted by signals at a bad time - ::sigset_t ss; - ::sigfillset(&ss); - ::pthread_sigmask(SIG_SETMASK, &ss, 0); - - do { - Event event = wait(); - - // If can read/write then dispatch appropriate callbacks - if (event.handle) { - event.process(); - } else { - // Handle shutdown - switch (event.type) { - case SHUTDOWN: - return; - default: - // This should be impossible - assert(false); + // Ensure that we exit thread responsibly under all circumstances + try { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + PollerHandleDeletionManager.destroyThreadState(); + return; + default: + // This should be impossible + assert(false); + } } - } - } while (true); + } while (true); + } catch (const std::exception& e) { + QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what()); + } + PollerHandleDeletionManager.destroyThreadState(); } Poller::Event Poller::wait(Duration timeout) { -- cgit v1.2.1