From 9094d2b10ecadd66fa3b22169183e7573cc79629 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 31 Oct 2006 19:53:55 +0000 Subject: IO refactor phase 1. Reduced dependencies, removed redundant classes. Renamed pricipal APR classes in preparation for move to apr namespace. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469625 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/concurrent/APRMonitor.cpp | 60 -------------------- cpp/src/qpid/concurrent/APRMonitor.h | 48 ---------------- cpp/src/qpid/concurrent/APRThread.cpp | 50 ----------------- cpp/src/qpid/concurrent/APRThread.h | 48 ---------------- cpp/src/qpid/concurrent/APRThreadFactory.cpp | 35 ------------ cpp/src/qpid/concurrent/APRThreadFactory.h | 44 --------------- cpp/src/qpid/concurrent/APRThreadPool.cpp | 83 ---------------------------- cpp/src/qpid/concurrent/APRThreadPool.h | 67 ---------------------- cpp/src/qpid/concurrent/LMonitor.h | 44 --------------- cpp/src/qpid/concurrent/LThreadFactory.h | 37 ------------- cpp/src/qpid/concurrent/Monitor.cpp | 60 ++++++++++++++++++++ cpp/src/qpid/concurrent/Monitor.h | 41 +++++++------- cpp/src/qpid/concurrent/MonitorImpl.h | 57 ------------------- cpp/src/qpid/concurrent/Thread.cpp | 50 +++++++++++++++++ cpp/src/qpid/concurrent/Thread.h | 19 +++++-- cpp/src/qpid/concurrent/ThreadFactory.cpp | 35 ++++++++++++ cpp/src/qpid/concurrent/ThreadFactory.h | 10 +++- cpp/src/qpid/concurrent/ThreadFactoryImpl.h | 52 ----------------- cpp/src/qpid/concurrent/ThreadPool.cpp | 83 ++++++++++++++++++++++++++++ cpp/src/qpid/concurrent/ThreadPool.h | 35 ++++++++++-- 20 files changed, 301 insertions(+), 657 deletions(-) delete mode 100644 cpp/src/qpid/concurrent/APRMonitor.cpp delete mode 100644 cpp/src/qpid/concurrent/APRMonitor.h delete mode 100644 cpp/src/qpid/concurrent/APRThread.cpp delete mode 100644 cpp/src/qpid/concurrent/APRThread.h delete mode 100644 cpp/src/qpid/concurrent/APRThreadFactory.cpp delete mode 100644 cpp/src/qpid/concurrent/APRThreadFactory.h delete mode 100644 cpp/src/qpid/concurrent/APRThreadPool.cpp delete mode 100644 cpp/src/qpid/concurrent/APRThreadPool.h delete mode 100644 cpp/src/qpid/concurrent/LMonitor.h delete mode 100644 cpp/src/qpid/concurrent/LThreadFactory.h create mode 100644 cpp/src/qpid/concurrent/Monitor.cpp delete mode 100644 cpp/src/qpid/concurrent/MonitorImpl.h create mode 100644 cpp/src/qpid/concurrent/Thread.cpp create mode 100644 cpp/src/qpid/concurrent/ThreadFactory.cpp delete mode 100644 cpp/src/qpid/concurrent/ThreadFactoryImpl.h create mode 100644 cpp/src/qpid/concurrent/ThreadPool.cpp (limited to 'cpp/src/qpid/concurrent') diff --git a/cpp/src/qpid/concurrent/APRMonitor.cpp b/cpp/src/qpid/concurrent/APRMonitor.cpp deleted file mode 100644 index cc5eda800f..0000000000 --- a/cpp/src/qpid/concurrent/APRMonitor.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRMonitor.h" -#include - -qpid::concurrent::APRMonitor::APRMonitor(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); -} - -qpid::concurrent::APRMonitor::~APRMonitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - APRBase::decrement(); -} - -void qpid::concurrent::APRMonitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - - -void qpid::concurrent::APRMonitor::wait(u_int64_t time){ - apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void qpid::concurrent::APRMonitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void qpid::concurrent::APRMonitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -void qpid::concurrent::APRMonitor::acquire(){ - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} - -void qpid::concurrent::APRMonitor::release(){ - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} diff --git a/cpp/src/qpid/concurrent/APRMonitor.h b/cpp/src/qpid/concurrent/APRMonitor.h deleted file mode 100644 index a396beab50..0000000000 --- a/cpp/src/qpid/concurrent/APRMonitor.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRMonitor_ -#define _APRMonitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - - class APRMonitor : public virtual Monitor - { - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - apr_thread_cond_t* condition; - - public: - APRMonitor(); - virtual ~APRMonitor(); - virtual void wait(); - virtual void wait(u_int64_t time); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); - }; -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThread.cpp b/cpp/src/qpid/concurrent/APRThread.cpp deleted file mode 100644 index d4d073cac6..0000000000 --- a/cpp/src/qpid/concurrent/APRThread.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThread.h" -#include "apr-1/apr_portable.h" - -using namespace qpid::concurrent; - -void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ - ((Runnable*) data)->run(); - CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); - return NULL; -} - -APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} - -APRThread::~APRThread(){ -} - -void APRThread::start(){ - CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); -} - -void APRThread::join(){ - apr_status_t status; - if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); -} - -void APRThread::interrupt(){ - if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); -} - -unsigned int qpid::concurrent::APRThread::currentThread(){ - return apr_os_thread_current(); -} diff --git a/cpp/src/qpid/concurrent/APRThread.h b/cpp/src/qpid/concurrent/APRThread.h deleted file mode 100644 index 6328765a06..0000000000 --- a/cpp/src/qpid/concurrent/APRThread.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThread_ -#define _APRThread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace concurrent { - - class APRThread : public Thread - { - const Runnable* runnable; - apr_pool_t* pool; - apr_thread_t* runner; - - public: - APRThread(apr_pool_t* pool, Runnable* runnable); - virtual ~APRThread(); - virtual void start(); - virtual void join(); - virtual void interrupt(); - static unsigned int currentThread(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.cpp b/cpp/src/qpid/concurrent/APRThreadFactory.cpp deleted file mode 100644 index 1c99a3da33..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadFactory.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThreadFactory.h" - -using namespace qpid::concurrent; - -APRThreadFactory::APRThreadFactory(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -APRThreadFactory::~APRThreadFactory(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -Thread* APRThreadFactory::create(Runnable* runnable){ - return new APRThread(pool, runnable); -} diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.h b/cpp/src/qpid/concurrent/APRThreadFactory.h deleted file mode 100644 index 40e96fc2d1..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadFactory.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadFactory_ -#define _APRThreadFactory_ - -#include "apr-1/apr_thread_proc.h" - -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class APRThreadFactory : public virtual ThreadFactory - { - apr_pool_t* pool; - public: - APRThreadFactory(); - virtual ~APRThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadPool.cpp b/cpp/src/qpid/concurrent/APRThreadPool.cpp deleted file mode 100644 index 3222c71b0c..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadPool.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRThreadFactory.h" -#include "qpid/concurrent/APRThreadPool.h" -#include "qpid/QpidError.h" -#include - -using namespace qpid::concurrent; - -APRThreadPool::APRThreadPool(int _size) : deleteFactory(true), size(_size), factory(new APRThreadFactory()), running(false){ - worker = new Worker(this); -} - -APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ - worker = new Worker(this); -} - -APRThreadPool::~APRThreadPool(){ - if(deleteFactory) delete factory; -} - -void APRThreadPool::addTask(Runnable* task){ - lock.acquire(); - tasks.push(task); - lock.notifyAll(); - lock.release(); -} - -void APRThreadPool::runTask(){ - lock.acquire(); - while(tasks.empty()){ - lock.wait(); - } - Runnable* task = tasks.front(); - tasks.pop(); - lock.release(); - try{ - task->run(); - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void APRThreadPool::start(){ - if(!running){ - running = true; - for(int i = 0; i < size; i++){ - Thread* t = factory->create(worker); - t->start(); - threads.push_back(t); - } - } -} - -void APRThreadPool::stop(){ - if(!running){ - running = false; - lock.acquire(); - lock.notifyAll(); - lock.release(); - for(int i = 0; i < size; i++){ - threads[i]->join(); - delete threads[i]; - } - } -} - - diff --git a/cpp/src/qpid/concurrent/APRThreadPool.h b/cpp/src/qpid/concurrent/APRThreadPool.h deleted file mode 100644 index cab5bcc9ce..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadPool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadPool_ -#define _APRThreadPool_ - -#include -#include -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class APRThreadPool : public virtual ThreadPool - { - class Worker : public virtual Runnable{ - APRThreadPool* pool; - public: - inline Worker(APRThreadPool* _pool) : pool(_pool){} - inline virtual void run(){ - while(pool->running){ - pool->runTask(); - } - } - }; - const bool deleteFactory; - const int size; - ThreadFactory* factory; - APRMonitor lock; - std::vector threads; - std::queue tasks; - Worker* worker; - volatile bool running; - - void runTask(); - public: - APRThreadPool(int size); - APRThreadPool(int size, ThreadFactory* factory); - virtual void start(); - virtual void stop(); - virtual void addTask(Runnable* task); - virtual ~APRThreadPool(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/LMonitor.h b/cpp/src/qpid/concurrent/LMonitor.h deleted file mode 100644 index 70e99b9807..0000000000 --- a/cpp/src/qpid/concurrent/LMonitor.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LMonitor_ -#define _LMonitor_ - -/* Native Linux Monitor - Based of Kernel patch 19/20 */ - -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - - class LMonitor : public virtual Monitor - { - - public: - LMonitor(); - virtual ~LMonitor(); - virtual void wait(); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); - }; -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/LThreadFactory.h b/cpp/src/qpid/concurrent/LThreadFactory.h deleted file mode 100644 index 4a573d1bd1..0000000000 --- a/cpp/src/qpid/concurrent/LThreadFactory.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LAPRThreadFactory_ -#define _LAPRThreadFactory_ - - -namespace qpid { -namespace concurrent { - - class LThreadFactory - { - public: - LThreadFactory(); - virtual ~LThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/Monitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp new file mode 100644 index 0000000000..ae68cf8751 --- /dev/null +++ b/cpp/src/qpid/concurrent/Monitor.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include "qpid/concurrent/Monitor.h" +#include + +qpid::concurrent::Monitor::Monitor(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); +} + +qpid::concurrent::Monitor::~Monitor(){ + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + APRBase::decrement(); +} + +void qpid::concurrent::Monitor::wait(){ + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + + +void qpid::concurrent::Monitor::wait(u_int64_t time){ + apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void qpid::concurrent::Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void qpid::concurrent::Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +void qpid::concurrent::Monitor::acquire(){ + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} + +void qpid::concurrent::Monitor::release(){ + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h index 42e88c0a48..a2777cb2f1 100644 --- a/cpp/src/qpid/concurrent/Monitor.h +++ b/cpp/src/qpid/concurrent/Monitor.h @@ -18,42 +18,39 @@ #ifndef _Monitor_ #define _Monitor_ -#include "qpid/framing/amqp_types.h" +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace concurrent { class Monitor { + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + apr_thread_cond_t* condition; + public: - virtual ~Monitor(){} - virtual void wait() = 0; - virtual void wait(u_int64_t time) = 0; - virtual void notify() = 0; - virtual void notifyAll() = 0; - virtual void acquire() = 0; - virtual void release() = 0; + Monitor(); + virtual ~Monitor(); + virtual void wait(); + virtual void wait(u_int64_t time); + virtual void notify(); + virtual void notifyAll(); + virtual void acquire(); + virtual void release(); }; -/** - * Scoped locker for a monitor. - */ class Locker { public: - Locker(Monitor& lock_) : lock(lock_) { lock.acquire(); } - ~Locker() { lock.release(); } - + Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } + ~Locker() { monitor.release(); } private: - Monitor& lock; - - // private and unimplemented to prevent copying - Locker(const Locker&); - void operator=(const Locker&); + Monitor& monitor; }; - -} -} +}} #endif diff --git a/cpp/src/qpid/concurrent/MonitorImpl.h b/cpp/src/qpid/concurrent/MonitorImpl.h deleted file mode 100644 index 258ad140b3..0000000000 --- a/cpp/src/qpid/concurrent/MonitorImpl.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - - -#ifndef _MonitorImpl_ -#define _MonitorImpl_ - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRMonitor.h" -#else /* use POSIX Monitor */ -#include "qpid/concurrent/LMonitor.h" -#endif - - -namespace qpid { -namespace concurrent { - -#ifdef _USE_APR_IO_ - class MonitorImpl : public virtual APRMonitor - { - - public: - MonitorImpl() : APRMonitor(){}; - virtual ~MonitorImpl(){}; - - }; -#else - class MonitorImpl : public virtual LMonitor - { - - public: - MonitorImpl() : LMonitor(){}; - virtual ~MonitorImpl(){}; - - }; -#endif - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/Thread.cpp b/cpp/src/qpid/concurrent/Thread.cpp new file mode 100644 index 0000000000..9bbc2f8131 --- /dev/null +++ b/cpp/src/qpid/concurrent/Thread.cpp @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include "qpid/concurrent/Thread.h" +#include "apr-1/apr_portable.h" + +using namespace qpid::concurrent; + +void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ + ((Runnable*) data)->run(); + CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); + return NULL; +} + +Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} + +Thread::~Thread(){ +} + +void Thread::start(){ + CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); +} + +void Thread::join(){ + apr_status_t status; + if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); +} + +void Thread::interrupt(){ + if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); +} + +unsigned int qpid::concurrent::Thread::currentThread(){ + return apr_os_thread_current(); +} diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h index 6bd2a379ce..d18bc153bf 100644 --- a/cpp/src/qpid/concurrent/Thread.h +++ b/cpp/src/qpid/concurrent/Thread.h @@ -18,16 +18,27 @@ #ifndef _Thread_ #define _Thread_ +#include "apr-1/apr_thread_proc.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/concurrent/Thread.h" + namespace qpid { namespace concurrent { class Thread { + const Runnable* runnable; + apr_pool_t* pool; + apr_thread_t* runner; + public: - virtual ~Thread(){} - virtual void start() = 0; - virtual void join() = 0; - virtual void interrupt() = 0; + Thread(apr_pool_t* pool, Runnable* runnable); + virtual ~Thread(); + virtual void start(); + virtual void join(); + virtual void interrupt(); + static unsigned int currentThread(); }; } diff --git a/cpp/src/qpid/concurrent/ThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp new file mode 100644 index 0000000000..b20f9f2b04 --- /dev/null +++ b/cpp/src/qpid/concurrent/ThreadFactory.cpp @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include "qpid/concurrent/ThreadFactory.h" + +using namespace qpid::concurrent; + +ThreadFactory::ThreadFactory(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +ThreadFactory::~ThreadFactory(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +Thread* ThreadFactory::create(Runnable* runnable){ + return new Thread(pool, runnable); +} diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h index 60c8ad2556..572419cae6 100644 --- a/cpp/src/qpid/concurrent/ThreadFactory.h +++ b/cpp/src/qpid/concurrent/ThreadFactory.h @@ -18,7 +18,11 @@ #ifndef _ThreadFactory_ #define _ThreadFactory_ +#include "apr-1/apr_thread_proc.h" + +#include "qpid/concurrent/Thread.h" #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" #include "qpid/concurrent/Runnable.h" namespace qpid { @@ -26,9 +30,11 @@ namespace concurrent { class ThreadFactory { + apr_pool_t* pool; public: - virtual ~ThreadFactory(){} - virtual Thread* create(Runnable* runnable) = 0; + ThreadFactory(); + virtual ~ThreadFactory(); + virtual Thread* create(Runnable* runnable); }; } diff --git a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h b/cpp/src/qpid/concurrent/ThreadFactoryImpl.h deleted file mode 100644 index 352b77ac21..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadFactoryImpl_ -#define _ThreadFactoryImpl_ - - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRThreadFactory.h" -#else -#include "qpid/concurrent/LThreadFactory.h" -#endif - - -namespace qpid { -namespace concurrent { - - -#ifdef _USE_APR_IO_ - class ThreadFactoryImpl : public virtual APRThreadFactory - { - public: - ThreadFactoryImpl(): APRThreadFactory() {}; - virtual ~ThreadFactoryImpl() {}; - }; -#else - class ThreadFactoryImpl : public virtual LThreadFactory - { - public: - ThreadFactoryImpl(): LThreadFactory() {}; - virtual ~ThreadFactoryImpl() {}; - }; -#endif -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/ThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp new file mode 100644 index 0000000000..5da19745a7 --- /dev/null +++ b/cpp/src/qpid/concurrent/ThreadPool.cpp @@ -0,0 +1,83 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" +#include "qpid/QpidError.h" +#include + +using namespace qpid::concurrent; + +ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){ + worker = new Worker(this); +} + +ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ + worker = new Worker(this); +} + +ThreadPool::~ThreadPool(){ + if(deleteFactory) delete factory; +} + +void ThreadPool::addTask(Runnable* task){ + lock.acquire(); + tasks.push(task); + lock.notifyAll(); + lock.release(); +} + +void ThreadPool::runTask(){ + lock.acquire(); + while(tasks.empty()){ + lock.wait(); + } + Runnable* task = tasks.front(); + tasks.pop(); + lock.release(); + try{ + task->run(); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void ThreadPool::start(){ + if(!running){ + running = true; + for(int i = 0; i < size; i++){ + Thread* t = factory->create(worker); + t->start(); + threads.push_back(t); + } + } +} + +void ThreadPool::stop(){ + if(!running){ + running = false; + lock.acquire(); + lock.notifyAll(); + lock.release(); + for(int i = 0; i < size; i++){ + threads[i]->join(); + delete threads[i]; + } + } +} + + diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h index 925faa76de..11f0cc364f 100644 --- a/cpp/src/qpid/concurrent/ThreadPool.h +++ b/cpp/src/qpid/concurrent/ThreadPool.h @@ -18,7 +18,12 @@ #ifndef _ThreadPool_ #define _ThreadPool_ +#include +#include +#include "qpid/concurrent/Monitor.h" #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" #include "qpid/concurrent/Runnable.h" namespace qpid { @@ -26,11 +31,33 @@ namespace concurrent { class ThreadPool { + class Worker : public virtual Runnable{ + ThreadPool* pool; + public: + inline Worker(ThreadPool* _pool) : pool(_pool){} + inline virtual void run(){ + while(pool->running){ + pool->runTask(); + } + } + }; + const bool deleteFactory; + const int size; + ThreadFactory* factory; + Monitor lock; + std::vector threads; + std::queue tasks; + Worker* worker; + volatile bool running; + + void runTask(); public: - virtual void start() = 0; - virtual void stop() = 0; - virtual void addTask(Runnable* runnable) = 0; - virtual ~ThreadPool(){} + ThreadPool(int size); + ThreadPool(int size, ThreadFactory* factory); + virtual void start(); + virtual void stop(); + virtual void addTask(Runnable* task); + virtual ~ThreadPool(); }; } -- cgit v1.2.1