diff options
| author | Alan Conway <aconway@apache.org> | 2006-11-09 21:55:34 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-11-09 21:55:34 +0000 |
| commit | c33f3d8550b9b4455ad6ca8a2327a7bd9d6f7db1 (patch) | |
| tree | bb5d68281986eb1664c227d15f303664a65d5e03 /cpp/src/qpid/sys | |
| parent | 76fb78a8495b6cd48c633e8b6219b29761133d82 (diff) | |
| download | qpid-python-c33f3d8550b9b4455ad6ca8a2327a7bd9d6f7db1.tar.gz | |
Added POSIX equivalents to APR classes used by clients, inlined trivial calls.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@473087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Monitor.h | 206 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/SessionContext.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/SessionHandler.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Socket.h | 42 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Thread.h | 80 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Time.cpp | 60 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Time.h | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/doxygen_summary.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/platform.h | 29 |
10 files changed, 449 insertions, 56 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index c192cae811..aaced4a673 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -19,8 +19,26 @@ * */ -#include "platform.h" -#include QPID_PLATFORM_H(Acceptor.h) +#include <stdint.h> +#include <qpid/SharedObject.h> +namespace qpid { +namespace sys { +class SessionHandlerFactory; + +class Acceptor : public qpid::SharedObject<Acceptor> +{ + public: + static Acceptor::shared_ptr create(int16_t port, int backlog, int threads); + virtual ~Acceptor() = 0; + virtual int16_t getPort() const = 0; + virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + virtual void shutdown() = 0; +}; + +}} + + + #endif /*!_sys_Acceptor_h*/ diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h index 7d7b7ac0cf..f5810fb6a7 100644 --- a/cpp/src/qpid/sys/Monitor.h +++ b/cpp/src/qpid/sys/Monitor.h @@ -19,7 +19,209 @@ * */ -#include <qpid/sys/platform.h> -#include QPID_PLATFORM_H(Monitor.h) +#include <boost/noncopyable.hpp> +#ifdef USE_APR +# include <apr-1/apr_thread_mutex.h> +# include <apr-1/apr_thread_cond.h> +# include <qpid/apr/APRBase.h> +# include <qpid/apr/APRPool.h> +#else +# include <pthread.h> +# include <qpid/sys/Time.h> +# include <qpid/posix/check.h> +#endif + +namespace qpid { +namespace sys { + +template <class L> +class ScopedLock +{ + public: + ScopedLock(L& l) : mutex(l) { l.lock(); } + ~ScopedLock() { mutex.unlock(); } + private: + L& mutex; +}; + + +class Mutex : private boost::noncopyable +{ + public: + typedef ScopedLock<Mutex> ScopedLock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + + protected: +#ifdef USE_APR + apr_thread_mutex_t* mutex; +#else + pthread_mutex_t mutex; +#endif +}; + +/** A condition variable and a mutex */ +class Monitor : public Mutex +{ + public: + inline Monitor(); + inline ~Monitor(); + inline void wait(); + inline bool wait(int64_t nsecs); + inline void notify(); + inline void notifyAll(); + + private: +#ifdef USE_APR + apr_thread_cond_t* condition; +#else + pthread_cond_t condition; +#endif +}; + + +// APR ================================================================ +#ifdef USE_APR + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +Monitor::Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Monitor::~Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Monitor::wait() { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +bool Monitor::wait(int64_t nsecs){ + // APR uses microseconds. + apr_status_t status = apr_thread_cond_timedwait( + condition, mutex, nsecs/1000); + if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); + return status == 0; +} + +void Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + + +}} + + +// POSIX ================================================================ +#else +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline void trylock(); + + // Must be public to be a POD: + pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + + +void PODMutex::lock() { + CHECK(pthread_mutex_lock(&mutex)); +} +void PODMutex::unlock() { + CHECK(pthread_mutex_unlock(&mutex)); +} + +void PODMutex::trylock() { + CHECK(pthread_mutex_trylock(&mutex)); +} + + +Mutex::Mutex() { + CHECK(pthread_mutex_init(&mutex, 0)); +} + +Mutex::~Mutex(){ + CHECK(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { + CHECK(pthread_mutex_lock(&mutex)); +} +void Mutex::unlock() { + CHECK(pthread_mutex_unlock(&mutex)); +} + +void Mutex::trylock() { + CHECK(pthread_mutex_trylock(&mutex)); +} + +Monitor::Monitor() { + CHECK(pthread_cond_init(&condition, 0)); +} + +Monitor::~Monitor() { + CHECK(pthread_cond_destroy(&condition)); +} + +void Monitor::wait() { + CHECK(pthread_cond_wait(&condition, &mutex)); +} + +bool Monitor::wait(int64_t nsecs){ + Time t(nsecs); + int status = pthread_cond_timedwait(&condition, &mutex, &t.getTimespec()); + if(status != 0) { + if (errno == ETIMEDOUT) return false; + CHECK(status); + } + return true; +} + +void Monitor::notify(){ + CHECK(pthread_cond_signal(&condition)); +} + +void Monitor::notifyAll(){ + CHECK(pthread_cond_broadcast(&condition)); +} + + +}} +#endif /*USE_APR*/ #endif /*!_sys_Monitor_h*/ diff --git a/cpp/src/qpid/sys/SessionContext.h b/cpp/src/qpid/sys/SessionContext.h index 1362b4f2f2..3beddaed8f 100644 --- a/cpp/src/qpid/sys/SessionContext.h +++ b/cpp/src/qpid/sys/SessionContext.h @@ -18,7 +18,7 @@ #ifndef _SessionContext_ #define _SessionContext_ -#include "qpid/framing/OutputHandler.h" +#include <qpid/framing/OutputHandler.h> namespace qpid { namespace sys { diff --git a/cpp/src/qpid/sys/SessionHandler.h b/cpp/src/qpid/sys/SessionHandler.h index 130e69caf4..ce8b51ad47 100644 --- a/cpp/src/qpid/sys/SessionHandler.h +++ b/cpp/src/qpid/sys/SessionHandler.h @@ -18,10 +18,10 @@ #ifndef _SessionHandler_ #define _SessionHandler_ -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/sys/TimeoutHandler.h" +#include <qpid/framing/InputHandler.h> +#include <qpid/framing/InitiationHandler.h> +#include <qpid/framing/ProtocolInitiation.h> +#include <qpid/sys/TimeoutHandler.h> namespace qpid { namespace sys { diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index 243764353e..9853d98496 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -19,9 +19,47 @@ * */ -#include <qpid/sys/platform.h> -#include QPID_PLATFORM_H(Socket.h) +#include <string> +#ifdef USE_APR +# include <apr-1/apr_network_io.h> +#endif + +namespace qpid { +namespace sys { + +class Socket +{ + public: + Socket(); + + /** Set timeout for read and write */ + void setTimeout(long msecs); + + void connect(const std::string& host, int port); + + void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + + /** Returns bytes sent or an ErrorCode value < 0. */ + ssize_t send(const char* data, size_t size); + + /** + * Returns bytes received, an ErrorCode value < 0 or 0 + * if the connection closed in an orderly manner. + */ + ssize_t recv(char* data, size_t size); + + private: +#ifdef USE_APR + apr_socket_t* socket; +#else + int socket; +#endif +}; + +}} #endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h index d884add776..0cd7a5b2eb 100644 --- a/cpp/src/qpid/sys/Thread.h +++ b/cpp/src/qpid/sys/Thread.h @@ -19,9 +19,85 @@ * */ -#include <qpid/sys/platform.h> -#include QPID_PLATFORM_H(Thread.h) +#include <qpid/sys/Runnable.h> +#ifdef USE_APR +# include <apr-1/apr_thread_proc.h> +# include <apr-1/apr_portable.h> +# include <qpid/apr/APRPool.h> +# include <qpid/apr/APRBase.h> +#else +# include <qpid/posix/check.h> +# include <pthread.h> +#endif +namespace qpid { +namespace sys { + +class Thread +{ + public: + inline Thread(); + inline explicit Thread(qpid::sys::Runnable*); + inline void join(); + inline static Thread current(); + + private: +#ifdef USE_APR + static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); + inline Thread(apr_thread_t* t); + apr_thread_t* thread; +#else + static void* runRunnable(void* runnable); + inline Thread(pthread_t); + pthread_t thread; +#endif +}; + + +Thread::Thread() : thread(0) {} + +// APR ================================================================ +#ifdef USE_APR + +Thread::Thread(Runnable* runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); +} + +void Thread::join(){ + apr_status_t status; + if (thread != 0) + CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); +} + +Thread::Thread(apr_thread_t* t) : thread(t) {} + +Thread Thread::current(){ + apr_thread_t* thr; + apr_os_thread_t osthr = apr_os_thread_current(); + CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); + return Thread(thr); +} + +// POSIX ================================================================ +#else + +Thread::Thread(Runnable* runnable) { + CHECK(pthread_create(&thread, NULL, runRunnable, runnable)); +} + +void Thread::join(){ + if (thread != 0) CHECK(pthread_join(thread, 0)); +} + +Thread::Thread(pthread_t thr) : thread(thr) {} + +Thread Thread::current() { + return Thread(pthread_self()); +} +#endif + +}} #endif /*!_sys_Thread_h*/ diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp new file mode 100644 index 0000000000..f06bb3c2e2 --- /dev/null +++ b/cpp/src/qpid/sys/Time.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Time.h" + +namespace qpid { +namespace sys { + +// APR ================================================================ +#if USE_APR + +Time Time::now() { + return Time(apr_time_now(), NSEC_PER_USEC); +} + +void Time::set(int64_t ticks, long nsec_per_tick) { + time = (ticks * nsec_per_tick) / NSEC_PER_USEC; +} + +int64_t Time::nsecs() const { + return time * NSEC_PER_USEC; +} + +// POSIX================================================================ +#else + +Time Time::now() { + Time t; + clock_gettime(CLOCK_REALTIME, &t.time); + return t; +} + +void Time::set(int64_t ticks, long nsec_per_tick) { + int64_t ns = ticks * nsec_per_tick; + time.tv_sec = ns / NSEC_PER_SEC; + time.tv_nsec = ns % NSEC_PER_SEC; +} + +int64_t Time::nsecs() const { + return time.tv_sec * NSEC_PER_SEC + time.tv_nsec; +} + +#endif +}} + diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index 92e83116a5..6faaf5e367 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -21,17 +21,52 @@ #include <stdint.h> +#ifdef USE_APR +# include <apr-1/apr_time.h> +#else +# include <time.h> +#endif + namespace qpid { namespace sys { -inline int64_t msecsToNsecs(int64_t msecs) { return msecs * 1000 *1000; } -inline int64_t nsecsToMsecs(int64_t nsecs) { return nsecs / (1000 *1000); } +class Time +{ + public: + static Time now(); + + enum { + NSEC_PER_SEC=1000*1000*1000, + NSEC_PER_MSEC=1000*1000, + NSEC_PER_USEC=1000 + }; + + inline Time(int64_t ticks=0, long nsec_per_tick=1); + + void set(int64_t ticks, long nsec_per_tick=1); + + inline int64_t msecs() const; + inline int64_t usecs() const; + int64_t nsecs() const; + +#ifndef USE_APR + const struct timespec& getTimespec() const { return time; } + struct timespec& getTimespec() { return time; } +#endif + + private: +#ifdef USE_APR + apr_time_t time; +#else + struct timespec time; +#endif +}; + +Time::Time(int64_t ticks, long nsec_per_tick) { set(ticks, nsec_per_tick); } -/** Nanoseconds since epoch */ -int64_t getTimeNsecs(); +int64_t Time::msecs() const { return nsecs() / NSEC_PER_MSEC; } -/** Milliseconds since epoch */ -int64_t getTimeMsecs(); +int64_t Time::usecs() const { return nsecs() / NSEC_PER_USEC; } }} diff --git a/cpp/src/qpid/sys/doxygen_summary.h b/cpp/src/qpid/sys/doxygen_summary.h index af89154fdf..d7c513da50 100644 --- a/cpp/src/qpid/sys/doxygen_summary.h +++ b/cpp/src/qpid/sys/doxygen_summary.h @@ -22,13 +22,6 @@ // No code just a doxygen comment for the namespace /** \namspace qpid::sys - * IO classes used by client and broker. - * - * This namespace contains platform-neutral classes. Platform - * specific classes are in a sub-namespace named after the - * platform. At build time the appropriate platform classes are - * imported into this namespace so other code does not need to be awre - * of the difference. - * + * Portability wrappers for platform-specific details. */ #endif /*!_doxygen_summary_*/ diff --git a/cpp/src/qpid/sys/platform.h b/cpp/src/qpid/sys/platform.h deleted file mode 100644 index 878c724953..0000000000 --- a/cpp/src/qpid/sys/platform.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef _sys_platform_h -#define _sys_platform_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * Macros for including platform-specific headers and aliasing - * platform-specific classes into the qpid::sys namespace. - */ - -#define QPID_PLATFORM_H(HEADER) <qpid/PLATFORM/HEADER> - -#endif /*!_sys_platform_h*/ |
