diff options
Diffstat (limited to 'cpp/src/qpid/concurrent')
16 files changed, 103 insertions, 459 deletions
diff --git a/cpp/src/qpid/concurrent/APRMonitor.h b/cpp/src/qpid/concurrent/APRMonitor.h deleted file mode 100644 index a396beab50..0000000000 --- a/cpp/src/qpid/concurrent/APRMonitor.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRMonitor_ -#define _APRMonitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - -    class APRMonitor : public virtual Monitor  -    { -	apr_pool_t* pool; -	apr_thread_mutex_t* mutex; -	apr_thread_cond_t* condition; - -    public: -	APRMonitor(); -	virtual ~APRMonitor(); -	virtual void wait(); -	virtual void wait(u_int64_t time); -	virtual void notify(); -	virtual void notifyAll(); -	virtual void acquire(); -	virtual void release(); -    }; -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThread.h b/cpp/src/qpid/concurrent/APRThread.h deleted file mode 100644 index 6328765a06..0000000000 --- a/cpp/src/qpid/concurrent/APRThread.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThread_ -#define _APRThread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace concurrent { - -    class APRThread : public Thread -    { -	const Runnable* runnable; -	apr_pool_t* pool; -	apr_thread_t* runner; - -    public: -	APRThread(apr_pool_t* pool, Runnable* runnable); -	virtual ~APRThread(); -	virtual void start(); -	virtual void join(); -	virtual void interrupt(); -        static unsigned int currentThread(); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.h b/cpp/src/qpid/concurrent/APRThreadFactory.h deleted file mode 100644 index 40e96fc2d1..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadFactory.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadFactory_ -#define _APRThreadFactory_ - -#include "apr-1/apr_thread_proc.h" - -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - -    class APRThreadFactory : public virtual ThreadFactory -    { -	apr_pool_t* pool; -    public: -	APRThreadFactory(); -	virtual ~APRThreadFactory(); -	virtual Thread* create(Runnable* runnable); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadPool.h b/cpp/src/qpid/concurrent/APRThreadPool.h deleted file mode 100644 index cab5bcc9ce..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadPool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadPool_ -#define _APRThreadPool_ - -#include <queue> -#include <vector> -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - -    class APRThreadPool : public virtual ThreadPool -    { -        class Worker : public virtual Runnable{ -            APRThreadPool* pool; -        public: -            inline Worker(APRThreadPool* _pool) : pool(_pool){} -            inline virtual void run(){ -                while(pool->running){ -                    pool->runTask(); -                } -            } -        }; -        const bool deleteFactory; -        const int size; -        ThreadFactory* factory; -        APRMonitor lock;  -        std::vector<Thread*> threads; -        std::queue<Runnable*> tasks; -        Worker* worker; -        volatile bool running; - -        void runTask(); -    public: -        APRThreadPool(int size); -        APRThreadPool(int size, ThreadFactory* factory); -        virtual void start(); -        virtual void stop(); -	virtual void addTask(Runnable* task); -        virtual ~APRThreadPool(); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/LMonitor.h b/cpp/src/qpid/concurrent/LMonitor.h deleted file mode 100644 index 70e99b9807..0000000000 --- a/cpp/src/qpid/concurrent/LMonitor.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LMonitor_ -#define _LMonitor_ - -/* Native Linux Monitor - Based of Kernel patch 19/20 */ - -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - -    class LMonitor : public virtual Monitor  -    { - -    public: -	LMonitor(); -	virtual ~LMonitor(); -	virtual void wait(); -	virtual void notify(); -	virtual void notifyAll(); -	virtual void acquire(); -	virtual void release(); -    }; -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/LThreadFactory.h b/cpp/src/qpid/concurrent/LThreadFactory.h deleted file mode 100644 index 4a573d1bd1..0000000000 --- a/cpp/src/qpid/concurrent/LThreadFactory.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LAPRThreadFactory_ -#define _LAPRThreadFactory_ - - -namespace qpid { -namespace concurrent { - -    class LThreadFactory  -    { -    public: -	LThreadFactory(); -	virtual ~LThreadFactory(); -	virtual Thread* create(Runnable* runnable); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRMonitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp index cc5eda800f..ae68cf8751 100644 --- a/cpp/src/qpid/concurrent/APRMonitor.cpp +++ b/cpp/src/qpid/concurrent/Monitor.cpp @@ -16,45 +16,45 @@   *   */  #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRMonitor.h" +#include "qpid/concurrent/Monitor.h"  #include <iostream> -qpid::concurrent::APRMonitor::APRMonitor(){ +qpid::concurrent::Monitor::Monitor(){      APRBase::increment();      CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));      CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));      CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));  } -qpid::concurrent::APRMonitor::~APRMonitor(){ +qpid::concurrent::Monitor::~Monitor(){      CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));      CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));      apr_pool_destroy(pool);      APRBase::decrement();  } -void qpid::concurrent::APRMonitor::wait(){ +void qpid::concurrent::Monitor::wait(){      CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));  } -void qpid::concurrent::APRMonitor::wait(u_int64_t time){ +void qpid::concurrent::Monitor::wait(u_int64_t time){      apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);      if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);  } -void qpid::concurrent::APRMonitor::notify(){ +void qpid::concurrent::Monitor::notify(){      CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));  } -void qpid::concurrent::APRMonitor::notifyAll(){ +void qpid::concurrent::Monitor::notifyAll(){      CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));  } -void qpid::concurrent::APRMonitor::acquire(){ +void qpid::concurrent::Monitor::acquire(){      CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));  } -void qpid::concurrent::APRMonitor::release(){ +void qpid::concurrent::Monitor::release(){      CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));  } diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h index 42e88c0a48..a2777cb2f1 100644 --- a/cpp/src/qpid/concurrent/Monitor.h +++ b/cpp/src/qpid/concurrent/Monitor.h @@ -18,42 +18,39 @@  #ifndef _Monitor_  #define _Monitor_ -#include "qpid/framing/amqp_types.h" +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "qpid/concurrent/Monitor.h"  namespace qpid {  namespace concurrent {  class Monitor  { +    apr_pool_t* pool; +    apr_thread_mutex_t* mutex; +    apr_thread_cond_t* condition; +    public: -    virtual ~Monitor(){} -    virtual void wait() = 0; -    virtual void wait(u_int64_t time) = 0; -    virtual void notify() = 0; -    virtual void notifyAll() = 0; -    virtual void acquire() = 0; -    virtual void release() = 0; +    Monitor(); +    virtual ~Monitor(); +    virtual void wait(); +    virtual void wait(u_int64_t time); +    virtual void notify(); +    virtual void notifyAll(); +    virtual void acquire(); +    virtual void release();  }; -/** - * Scoped locker for a monitor. - */  class Locker  {    public: -    Locker(Monitor&  lock_) : lock(lock_) { lock.acquire(); } -    ~Locker() { lock.release(); } - +    Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } +    ~Locker() { monitor.release(); }    private: -    Monitor& lock; - -    // private and unimplemented to prevent copying -    Locker(const Locker&); -    void operator=(const Locker&); +    Monitor& monitor;  }; - -} -} +}}  #endif diff --git a/cpp/src/qpid/concurrent/MonitorImpl.h b/cpp/src/qpid/concurrent/MonitorImpl.h deleted file mode 100644 index 258ad140b3..0000000000 --- a/cpp/src/qpid/concurrent/MonitorImpl.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -  -  -#ifndef _MonitorImpl_ -#define _MonitorImpl_ - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRMonitor.h" -#else /* use POSIX Monitor */ -#include "qpid/concurrent/LMonitor.h"   -#endif - - -namespace qpid { -namespace concurrent { - -#ifdef _USE_APR_IO_ -    class MonitorImpl : public virtual APRMonitor  -    { - -    public: -	MonitorImpl() : APRMonitor(){}; -	virtual ~MonitorImpl(){}; - -    }; -#else -    class MonitorImpl : public virtual LMonitor  -    { - -    public: -	MonitorImpl() : LMonitor(){}; -	virtual ~MonitorImpl(){}; - -    };    -#endif -     -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThread.cpp b/cpp/src/qpid/concurrent/Thread.cpp index d4d073cac6..9bbc2f8131 100644 --- a/cpp/src/qpid/concurrent/APRThread.cpp +++ b/cpp/src/qpid/concurrent/Thread.cpp @@ -16,7 +16,7 @@   *   */  #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThread.h" +#include "qpid/concurrent/Thread.h"  #include "apr-1/apr_portable.h"  using namespace qpid::concurrent; @@ -27,24 +27,24 @@ void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){      return NULL;  }  -APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} +Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} -APRThread::~APRThread(){ +Thread::~Thread(){  } -void APRThread::start(){ +void Thread::start(){      CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));  } -void APRThread::join(){ +void Thread::join(){      apr_status_t status;      if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));  } -void APRThread::interrupt(){ +void Thread::interrupt(){      if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));  } -unsigned int qpid::concurrent::APRThread::currentThread(){ +unsigned int qpid::concurrent::Thread::currentThread(){      return apr_os_thread_current();  } diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h index 6bd2a379ce..d18bc153bf 100644 --- a/cpp/src/qpid/concurrent/Thread.h +++ b/cpp/src/qpid/concurrent/Thread.h @@ -18,16 +18,27 @@  #ifndef _Thread_  #define _Thread_ +#include "apr-1/apr_thread_proc.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/concurrent/Thread.h" +  namespace qpid {  namespace concurrent {      class Thread      { +	const Runnable* runnable; +	apr_pool_t* pool; +	apr_thread_t* runner; +      public: -        virtual ~Thread(){} -	virtual void start() = 0; -	virtual void join() = 0; -	virtual void interrupt() = 0; +	Thread(apr_pool_t* pool, Runnable* runnable); +	virtual ~Thread(); +	virtual void start(); +	virtual void join(); +	virtual void interrupt(); +        static unsigned int currentThread();      };  } diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp index 1c99a3da33..b20f9f2b04 100644 --- a/cpp/src/qpid/concurrent/APRThreadFactory.cpp +++ b/cpp/src/qpid/concurrent/ThreadFactory.cpp @@ -16,20 +16,20 @@   *   */  #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/concurrent/ThreadFactory.h"  using namespace qpid::concurrent; -APRThreadFactory::APRThreadFactory(){ +ThreadFactory::ThreadFactory(){      APRBase::increment();      CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));  } -APRThreadFactory::~APRThreadFactory(){ +ThreadFactory::~ThreadFactory(){      apr_pool_destroy(pool);      APRBase::decrement();  } -Thread* APRThreadFactory::create(Runnable* runnable){ -    return new APRThread(pool, runnable); +Thread* ThreadFactory::create(Runnable* runnable){ +    return new Thread(pool, runnable);  } diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h index 60c8ad2556..572419cae6 100644 --- a/cpp/src/qpid/concurrent/ThreadFactory.h +++ b/cpp/src/qpid/concurrent/ThreadFactory.h @@ -18,7 +18,11 @@  #ifndef _ThreadFactory_  #define _ThreadFactory_ +#include "apr-1/apr_thread_proc.h" + +#include "qpid/concurrent/Thread.h"  #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h"  #include "qpid/concurrent/Runnable.h"  namespace qpid { @@ -26,9 +30,11 @@ namespace concurrent {      class ThreadFactory      { +	apr_pool_t* pool;      public: -        virtual ~ThreadFactory(){} -	virtual Thread* create(Runnable* runnable) = 0; +	ThreadFactory(); +	virtual ~ThreadFactory(); +	virtual Thread* create(Runnable* runnable);      };  } diff --git a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h b/cpp/src/qpid/concurrent/ThreadFactoryImpl.h deleted file mode 100644 index 352b77ac21..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadFactoryImpl_ -#define _ThreadFactoryImpl_ - - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRThreadFactory.h" -#else -#include "qpid/concurrent/LThreadFactory.h" -#endif - - -namespace qpid { -namespace concurrent { - - -#ifdef _USE_APR_IO_ -    class ThreadFactoryImpl : public virtual APRThreadFactory -    { -    public: -	ThreadFactoryImpl(): APRThreadFactory() {}; -	virtual ~ThreadFactoryImpl() {}; -    }; -#else -    class ThreadFactoryImpl : public virtual LThreadFactory -    { -    public: -	ThreadFactoryImpl(): LThreadFactory() {}; -	virtual ~ThreadFactoryImpl() {}; -    }; -#endif -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp index 3222c71b0c..5da19745a7 100644 --- a/cpp/src/qpid/concurrent/APRThreadPool.cpp +++ b/cpp/src/qpid/concurrent/ThreadPool.cpp @@ -15,33 +15,33 @@   * limitations under the License.   *   */ -#include "qpid/concurrent/APRThreadFactory.h" -#include "qpid/concurrent/APRThreadPool.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h"  #include "qpid/QpidError.h"  #include <iostream>  using namespace qpid::concurrent; -APRThreadPool::APRThreadPool(int _size) : deleteFactory(true), size(_size), factory(new APRThreadFactory()), running(false){ +ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){      worker = new Worker(this);  } -APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) :     deleteFactory(false), size(_size), factory(_factory), running(false){ +ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) :     deleteFactory(false), size(_size), factory(_factory), running(false){      worker = new Worker(this);  } -APRThreadPool::~APRThreadPool(){ +ThreadPool::~ThreadPool(){      if(deleteFactory) delete factory;  } -void APRThreadPool::addTask(Runnable* task){ +void ThreadPool::addTask(Runnable* task){      lock.acquire();      tasks.push(task);      lock.notifyAll();      lock.release();  } -void APRThreadPool::runTask(){ +void ThreadPool::runTask(){      lock.acquire();      while(tasks.empty()){          lock.wait(); @@ -56,7 +56,7 @@ void APRThreadPool::runTask(){      }  } -void APRThreadPool::start(){ +void ThreadPool::start(){      if(!running){          running = true;          for(int i = 0; i < size; i++){ @@ -67,7 +67,7 @@ void APRThreadPool::start(){      }  } -void APRThreadPool::stop(){ +void ThreadPool::stop(){      if(!running){          running = false;          lock.acquire(); diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h index 925faa76de..11f0cc364f 100644 --- a/cpp/src/qpid/concurrent/ThreadPool.h +++ b/cpp/src/qpid/concurrent/ThreadPool.h @@ -18,7 +18,12 @@  #ifndef _ThreadPool_  #define _ThreadPool_ +#include <queue> +#include <vector> +#include "qpid/concurrent/Monitor.h"  #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h"  #include "qpid/concurrent/Runnable.h"  namespace qpid { @@ -26,11 +31,33 @@ namespace concurrent {      class ThreadPool      { +        class Worker : public virtual Runnable{ +            ThreadPool* pool; +        public: +            inline Worker(ThreadPool* _pool) : pool(_pool){} +            inline virtual void run(){ +                while(pool->running){ +                    pool->runTask(); +                } +            } +        }; +        const bool deleteFactory; +        const int size; +        ThreadFactory* factory; +        Monitor lock;  +        std::vector<Thread*> threads; +        std::queue<Runnable*> tasks; +        Worker* worker; +        volatile bool running; + +        void runTask();      public: -        virtual void start() = 0; -        virtual void stop() = 0; -	virtual void addTask(Runnable* runnable) = 0; -        virtual ~ThreadPool(){} +        ThreadPool(int size); +        ThreadPool(int size, ThreadFactory* factory); +        virtual void start(); +        virtual void stop(); +	virtual void addTask(Runnable* task); +        virtual ~ThreadPool();      };  }  | 
