diff options
| author | Alan Conway <aconway@apache.org> | 2006-11-01 01:19:12 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-11-01 01:19:12 +0000 |
| commit | dda71d21e76e01918ebec2d80dd8e077f94216e0 (patch) | |
| tree | 79283c295e00de1eee8d98d4fd9b781db8497c28 /cpp/src/qpid/concurrent | |
| parent | 9094d2b10ecadd66fa3b22169183e7573cc79629 (diff) | |
| download | qpid-python-dda71d21e76e01918ebec2d80dd8e077f94216e0.tar.gz | |
Moved APR specific sources into src_apr.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/concurrent')
| -rw-r--r-- | cpp/src/qpid/concurrent/APRBase.cpp | 96 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/APRBase.h | 63 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/Monitor.cpp | 60 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/Monitor.h | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/Thread.cpp | 50 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/Thread.h | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/ThreadFactory.cpp | 35 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/ThreadPool.cpp | 83 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/ThreadPool.h | 67 | ||||
| -rw-r--r-- | cpp/src/qpid/concurrent/Time.h (renamed from cpp/src/qpid/concurrent/ThreadFactory.h) | 46 |
10 files changed, 27 insertions, 577 deletions
diff --git a/cpp/src/qpid/concurrent/APRBase.cpp b/cpp/src/qpid/concurrent/APRBase.cpp deleted file mode 100644 index 514c4d1048..0000000000 --- a/cpp/src/qpid/concurrent/APRBase.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" - -using namespace qpid::concurrent; - -APRBase* APRBase::instance = 0; - -APRBase* APRBase::getInstance(){ - if(instance == 0){ - instance = new APRBase(); - } - return instance; -} - - -APRBase::APRBase() : count(0){ - apr_initialize(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); -} - -APRBase::~APRBase(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - apr_terminate(); -} - -bool APRBase::_increment(){ - bool deleted(false); - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(this == instance){ - count++; - }else{ - deleted = true; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - return !deleted; -} - -void APRBase::_decrement(){ - APRBase* copy = 0; - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(--count == 0){ - copy = instance; - instance = 0; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - if(copy != 0){ - delete copy; - } -} - -void APRBase::increment(){ - int count = 0; - while(count++ < 2 && !getInstance()->_increment()){ - std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; - } -} - -void APRBase::decrement(){ - getInstance()->_decrement(); -} - -void qpid::concurrent::check(apr_status_t status, const std::string& file, const int line){ - if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw QpidError(APR_ERROR + ((int) status), msg, file, line); - } -} - -std::string qpid::concurrent::get_desc(apr_status_t status){ - const int size = 50; - char tmp[size]; - return std::string(apr_strerror(status, tmp, size)); -} - diff --git a/cpp/src/qpid/concurrent/APRBase.h b/cpp/src/qpid/concurrent/APRBase.h deleted file mode 100644 index f3ff0f89c1..0000000000 --- a/cpp/src/qpid/concurrent/APRBase.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRBase_ -#define _APRBase_ - -#include <string> -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_errno.h" - -namespace qpid { -namespace concurrent { - - /** - * Use of APR libraries necessitates explicit init and terminate - * calls. Any class using APR libs should obtain the reference to - * this singleton and increment on construction, decrement on - * destruction. This class can then correctly initialise apr - * before the first use and terminate after the last use. - */ - class APRBase{ - static APRBase* instance; - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - int count; - - APRBase(); - ~APRBase(); - static APRBase* getInstance(); - bool _increment(); - void _decrement(); - public: - static void increment(); - static void decrement(); - }; - - //this is also a convenient place for a helper function for error checking: - void check(apr_status_t status, const std::string& file, const int line); - std::string get_desc(apr_status_t status); - -#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); - -} -} - - - - -#endif diff --git a/cpp/src/qpid/concurrent/Monitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp deleted file mode 100644 index ae68cf8751..0000000000 --- a/cpp/src/qpid/concurrent/Monitor.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/Monitor.h" -#include <iostream> - -qpid::concurrent::Monitor::Monitor(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); -} - -qpid::concurrent::Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - APRBase::decrement(); -} - -void qpid::concurrent::Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - - -void qpid::concurrent::Monitor::wait(u_int64_t time){ - apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void qpid::concurrent::Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void qpid::concurrent::Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -void qpid::concurrent::Monitor::acquire(){ - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} - -void qpid::concurrent::Monitor::release(){ - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h deleted file mode 100644 index a2777cb2f1..0000000000 --- a/cpp/src/qpid/concurrent/Monitor.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Monitor_ -#define _Monitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - -class Monitor -{ - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - apr_thread_cond_t* condition; - - public: - Monitor(); - virtual ~Monitor(); - virtual void wait(); - virtual void wait(u_int64_t time); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); -}; - -class Locker -{ - public: - Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } - ~Locker() { monitor.release(); } - private: - Monitor& monitor; -}; -}} - - -#endif diff --git a/cpp/src/qpid/concurrent/Thread.cpp b/cpp/src/qpid/concurrent/Thread.cpp deleted file mode 100644 index 9bbc2f8131..0000000000 --- a/cpp/src/qpid/concurrent/Thread.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/Thread.h" -#include "apr-1/apr_portable.h" - -using namespace qpid::concurrent; - -void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ - ((Runnable*) data)->run(); - CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); - return NULL; -} - -Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} - -Thread::~Thread(){ -} - -void Thread::start(){ - CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); -} - -void Thread::join(){ - apr_status_t status; - if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); -} - -void Thread::interrupt(){ - if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); -} - -unsigned int qpid::concurrent::Thread::currentThread(){ - return apr_os_thread_current(); -} diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h deleted file mode 100644 index d18bc153bf..0000000000 --- a/cpp/src/qpid/concurrent/Thread.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Thread_ -#define _Thread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace concurrent { - - class Thread - { - const Runnable* runnable; - apr_pool_t* pool; - apr_thread_t* runner; - - public: - Thread(apr_pool_t* pool, Runnable* runnable); - virtual ~Thread(); - virtual void start(); - virtual void join(); - virtual void interrupt(); - static unsigned int currentThread(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/ThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp deleted file mode 100644 index b20f9f2b04..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactory.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/ThreadFactory.h" - -using namespace qpid::concurrent; - -ThreadFactory::ThreadFactory(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -ThreadFactory::~ThreadFactory(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -Thread* ThreadFactory::create(Runnable* runnable){ - return new Thread(pool, runnable); -} diff --git a/cpp/src/qpid/concurrent/ThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp deleted file mode 100644 index 5da19745a7..0000000000 --- a/cpp/src/qpid/concurrent/ThreadPool.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/QpidError.h" -#include <iostream> - -using namespace qpid::concurrent; - -ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){ - worker = new Worker(this); -} - -ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ - worker = new Worker(this); -} - -ThreadPool::~ThreadPool(){ - if(deleteFactory) delete factory; -} - -void ThreadPool::addTask(Runnable* task){ - lock.acquire(); - tasks.push(task); - lock.notifyAll(); - lock.release(); -} - -void ThreadPool::runTask(){ - lock.acquire(); - while(tasks.empty()){ - lock.wait(); - } - Runnable* task = tasks.front(); - tasks.pop(); - lock.release(); - try{ - task->run(); - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void ThreadPool::start(){ - if(!running){ - running = true; - for(int i = 0; i < size; i++){ - Thread* t = factory->create(worker); - t->start(); - threads.push_back(t); - } - } -} - -void ThreadPool::stop(){ - if(!running){ - running = false; - lock.acquire(); - lock.notifyAll(); - lock.release(); - for(int i = 0; i < size; i++){ - threads[i]->join(); - delete threads[i]; - } - } -} - - diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h deleted file mode 100644 index 11f0cc364f..0000000000 --- a/cpp/src/qpid/concurrent/ThreadPool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadPool_ -#define _ThreadPool_ - -#include <queue> -#include <vector> -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class ThreadPool - { - class Worker : public virtual Runnable{ - ThreadPool* pool; - public: - inline Worker(ThreadPool* _pool) : pool(_pool){} - inline virtual void run(){ - while(pool->running){ - pool->runTask(); - } - } - }; - const bool deleteFactory; - const int size; - ThreadFactory* factory; - Monitor lock; - std::vector<Thread*> threads; - std::queue<Runnable*> tasks; - Worker* worker; - volatile bool running; - - void runTask(); - public: - ThreadPool(int size); - ThreadPool(int size, ThreadFactory* factory); - virtual void start(); - virtual void stop(); - virtual void addTask(Runnable* task); - virtual ~ThreadPool(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/Time.h index 572419cae6..ec64ce8a85 100644 --- a/cpp/src/qpid/concurrent/ThreadFactory.h +++ b/cpp/src/qpid/concurrent/Time.h @@ -1,3 +1,6 @@ +#ifndef _concurrent_Time_h +#define _concurrent_Time_h + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,30 +18,35 @@ * limitations under the License. * */ -#ifndef _ThreadFactory_ -#define _ThreadFactory_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" +#include <stdint.h> namespace qpid { namespace concurrent { - class ThreadFactory - { - apr_pool_t* pool; - public: - ThreadFactory(); - virtual ~ThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; +/** + * Time since the epoch. + */ +class Time +{ + public: + static const int64_t NANOS = 1000000000; + static const int64_t MICROS = 1000000; + static const int64_t MILLIS = 1000; + + static Time now(); + + Time(int64_t nsecs_) : ticks(nsecs_) {} + + int64_t nsecs() const { return ticks; } + int64_t usecs() const { return nsecs()/1000; } + int64_t msecs() const { return usecs()/1000; } + int64_t secs() const { return msecs()/1000; } -} -} + private: + int64_t ticks; +}; +}} -#endif +#endif /*!_concurrent_Time_h*/ |
