summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/EventChannelThreads.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannelThreads.h')
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelThreads.h43
1 files changed, 28 insertions, 15 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.h b/cpp/src/qpid/sys/posix/EventChannelThreads.h
index 245cefe585..19112cf4db 100644
--- a/cpp/src/qpid/sys/posix/EventChannelThreads.h
+++ b/cpp/src/qpid/sys/posix/EventChannelThreads.h
@@ -18,14 +18,16 @@
* limitations under the License.
*
*/
-#include <vector>
+#include "EventChannel.h"
#include "qpid/Exception.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/AtomicCount.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
-#include "qpid/sys/AtomicCount.h"
-#include "EventChannel.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Runnable.h"
+
+#include <vector>
namespace qpid {
namespace sys {
@@ -33,26 +35,33 @@ namespace sys {
/**
Dynamic thread pool serving an EventChannel.
- Threads run a loop { e = getEvent(); e->dispatch(); }
+ Threads run a loop { e = wait(); e->dispatch(); }
The size of the thread pool is automatically adjusted to optimal size.
*/
class EventChannelThreads :
public qpid::SharedObject<EventChannelThreads>,
- public sys::Monitor, private sys::Runnable
+ private sys::Runnable
{
public:
- /** Create the thread pool and start initial threads. */
+ /** Constant to represent an unlimited number of threads */
+ static const size_t unlimited;
+
+ /**
+ * Create the thread pool and start initial threads.
+ * @param minThreads Pool will initialy contain minThreads threads and
+ * will never shrink to less until shutdown.
+ * @param maxThreads Pool will never grow to more than maxThreads.
+ */
static EventChannelThreads::shared_ptr create(
- EventChannel::shared_ptr channel
+ EventChannel::shared_ptr channel = EventChannel::create(),
+ size_t minThreads = 1,
+ size_t maxThreads = unlimited
);
~EventChannelThreads();
/** Post event to the underlying channel */
- void postEvent(Event& event) { channel->postEvent(event); }
-
- /** Post event to the underlying channel Must not be 0. */
- void postEvent(Event* event) { channel->postEvent(event); }
+ void post(Event& event) { channel->post(event); }
/**
* Terminate all threads.
@@ -68,21 +77,25 @@ class EventChannelThreads :
private:
typedef std::vector<sys::Thread> Threads;
typedef enum {
- RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ RUNNING, TERMINATING, JOINING, SHUTDOWN
} State;
- EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ EventChannelThreads(
+ EventChannel::shared_ptr channel, size_t min, size_t max);
+
void addThread();
void run();
bool keepRunning();
void adjustThreads();
+ Monitor monitor;
+ size_t minThreads;
+ size_t maxThreads;
EventChannel::shared_ptr channel;
Threads workers;
sys::AtomicCount nWaiting;
State state;
- Event terminate;
};