diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2007-05-22 15:18:08 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2007-05-22 15:18:08 +0000 |
| commit | f646350b5e59ccf49f1253bd55f98d062769f2ee (patch) | |
| tree | ba8143aa842ced96eaa450cc236a96abdd8b9c05 /cpp/src/qpid/sys/posix | |
| parent | b8f00ac2a358a02d0cdae2dc098f2bacb2af44d5 (diff) | |
| download | qpid-python-f646350b5e59ccf49f1253bd55f98d062769f2ee.tar.gz | |
* Split apart platform (threading etc.) from network io
you can now use a posix platform implementation by configuring
--disable-apr-platform
* Changed Time classes to distinguish between absolute times (AbsTime)
and durations (Duration). This should avoid bugs caused by confusing
the two types of time.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540608 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/posix')
| -rw-r--r-- | cpp/src/qpid/sys/posix/Condition.h | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/EventChannelConnection.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Module.h | 126 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Mutex.h | 127 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/PrivatePosix.h | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Socket.h | 76 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Thread.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Thread.h | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Time.cpp | 57 |
11 files changed, 616 insertions, 17 deletions
diff --git a/cpp/src/qpid/sys/posix/Condition.h b/cpp/src/qpid/sys/posix/Condition.h new file mode 100644 index 0000000000..1c8d1a80b1 --- /dev/null +++ b/cpp/src/qpid/sys/posix/Condition.h @@ -0,0 +1,86 @@ +#ifndef _sys_posix_Condition_h +#define _sys_posix_Condition_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PrivatePosix.h" + +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +#include <time.h> +#include <sys/errno.h> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * A condition variable for thread synchronization. + */ +class Condition +{ + public: + inline Condition(); + inline ~Condition(); + inline void wait(Mutex&); + inline bool wait(Mutex&, const AbsTime& absoluteTime); + inline void notify(); + inline void notifyAll(); + + private: + pthread_cond_t condition; +}; + +Condition::Condition() { + QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); +} + +Condition::~Condition() { + QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); +} + +void Condition::wait(Mutex& mutex) { + QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); +} + +bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ + struct timespec ts; + toTimespec(ts, Duration(absoluteTime)); + int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts); + if (status != 0) { + if (status == ETIMEDOUT) return false; + throw QPID_POSIX_ERROR(status); + } + return true; +} + +void Condition::notify(){ + QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); +} + +void Condition::notifyAll(){ + QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); +} + +}} +#endif /*!_sys_posix_Condition_h*/ diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp index 2676985cc6..d5a2c238d9 100644 --- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp @@ -51,9 +51,9 @@ class EventChannelAcceptor : public Acceptor { int16_t port_, int backlog, int nThreads, bool trace_ ); - int getPort() const; + uint16_t getPort() const; - void run(ConnectionInputHandlerFactory& factory); + void run(ConnectionInputHandlerFactory* factory); void shutdown(); @@ -96,17 +96,17 @@ EventChannelAcceptor::EventChannelAcceptor( threads(EventChannelThreads::create(EventChannel::create(), nThreads)) { } -int EventChannelAcceptor::getPort() const { +uint16_t EventChannelAcceptor::getPort() const { return port; // Immutable no need for lock. } -void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) { +void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { { Mutex::ScopedLock l(lock); if (!isRunning && !isShutdown) { isRunning = true; - factory = &f; - threads->post(acceptEvent); + factory = f; + threads->postEvent(acceptEvent); } } threads->join(); // Wait for shutdown. @@ -143,7 +143,7 @@ void EventChannelAcceptor::accept() int fd = acceptEvent.getAcceptedDesscriptor(); connections.push_back( new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->post(acceptEvent); // Keep accepting. + threads->postEvent(acceptEvent); // Keep accepting. } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp index 9a7d53fa8a..73e617ea83 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp @@ -140,7 +140,7 @@ void EventChannelConnection::startWrite() { writeFd, out.start(), out.available(), boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endWrite)); - threads->post(writeEvent); + threads->postEvent(writeEvent); } // ScopedBusy ctor increments busyThreads. @@ -180,7 +180,7 @@ void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( readFd, in.start(), in.available(), readCallback,true); - threads->post(readEvent); + threads->postEvent(readEvent); } // Completion of initial read, expect protocolInit. diff --git a/cpp/src/qpid/sys/posix/Module.h b/cpp/src/qpid/sys/posix/Module.h new file mode 100644 index 0000000000..af3d6ac6ef --- /dev/null +++ b/cpp/src/qpid/sys/posix/Module.h @@ -0,0 +1,126 @@ +#ifndef _sys_posix_Module_h +#define _sys_posix_Module_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/QpidError.h" + +#include <boost/noncopyable.hpp> +#include <iostream> +#include <dlfcn.h> + +namespace qpid { +namespace sys { + +typedef void* dso_handle_t; + +template <class T> class Module : private boost::noncopyable +{ + typedef T* create_t(); + typedef void destroy_t(T*); + + dso_handle_t handle; + destroy_t* destroy; + T* ptr; + + void load(const std::string& name); + void unload(); + void* getSymbol(const std::string& name); + +public: + Module(const std::string& name); + T* operator->(); + T* get(); + ~Module() throw(); +}; + +template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) +{ + load(module); + //TODO: need a better strategy for symbol names to allow multiple + //modules to be loaded without clashes... + + //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic + create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); + destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); + ptr = create(); +} + +template <class T> T* Module<T>::operator->() +{ + return ptr; +} + +template <class T> T* Module<T>::get() +{ + return ptr; +} + +template <class T> Module<T>::~Module() throw() +{ + try { + if (handle && ptr) { + destroy(ptr); + } + if (handle) unload(); + } catch (std::exception& e) { + std::cout << "Error while destroying module: " << e.what() << std::endl; + } + destroy = 0; + handle = 0; + ptr = 0; +} + +template <class T> void Module<T>::load(const std::string& name) +{ + dlerror(); + handle = dlopen(name.c_str(), RTLD_NOW); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void Module<T>::unload() +{ + dlerror(); + dlclose(handle); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + dlerror(); + void* sym = dlsym(handle, name.c_str()); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } + return sym; +} + +}} +#endif //ifndef _sys_posix_Module_h + diff --git a/cpp/src/qpid/sys/posix/Mutex.h b/cpp/src/qpid/sys/posix/Mutex.h new file mode 100644 index 0000000000..b278c6b14a --- /dev/null +++ b/cpp/src/qpid/sys/posix/Mutex.h @@ -0,0 +1,127 @@ +#ifndef _sys_posix_Mutex_h +#define _sys_posix_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "check.h" + +#include <pthread.h> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +class Condition; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + friend class Condition; + +public: + typedef ScopedLock<Mutex> ScopedLock; + typedef ScopedUnlock<Mutex> ScopedUnlock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + +protected: + pthread_mutex_t mutex; +}; + +/** + * Initialise a recursive mutex attr for use in creating mutexes later + * (we use pthread_once to make sure it is initialised exactly once) + */ +namespace { + pthread_once_t onceControl = PTHREAD_ONCE_INIT; + pthread_mutexattr_t mutexattr; + + void initMutexattr() { + pthread_mutexattr_init(&mutexattr); + pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); + } + + struct RecursiveMutexattr { + RecursiveMutexattr() { + pthread_once(&onceControl, initMutexattr); + } + + operator const pthread_mutexattr_t*() const { + return &mutexattr; + } + }; + + RecursiveMutexattr recursiveMutexattr; +} + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline void trylock(); + + // Must be public to be a POD: + pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + +void PODMutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void PODMutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void PODMutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +Mutex::Mutex() { + QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr)); +} + +Mutex::~Mutex(){ + QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void Mutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void Mutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +}} +#endif /*!_sys_posix_Mutex_h*/ diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h new file mode 100644 index 0000000000..2707057ef0 --- /dev/null +++ b/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -0,0 +1,39 @@ +#ifndef _sys_posix_PrivatePosix_h +#define _sys_posix_PrivatePosix_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" + +struct timespec; +struct timeval; + +namespace qpid { +namespace sys { + +struct timespec& toTimespec(struct timespec& ts, const Duration& t); +struct timeval& toTimeval(struct timeval& tv, const Duration& t); +Duration toTime(const struct timespec& ts); + +}} + +#endif /*!_sys_posix_PrivatePosix_h*/ diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index e9bd2eeb6b..39651fa821 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -19,6 +19,12 @@ * */ +#include "qpid/sys/Socket.h" + +#include "qpid/QpidError.h" +#include "check.h" +#include "PrivatePosix.h" + #include <sys/socket.h> #include <sys/errno.h> #include <netinet/in.h> @@ -26,10 +32,6 @@ #include <boost/format.hpp> -#include "qpid/QpidError.h" -#include "check.h" -#include "qpid/sys/Socket.h" - using namespace qpid::sys; Socket Socket::createTcp() @@ -41,11 +43,10 @@ Socket Socket::createTcp() Socket::Socket(int descriptor) : socket(descriptor) {} -void Socket::setTimeout(Time interval) +void Socket::setTimeout(const Duration& interval) { struct timeval tv; - tv.tv_sec = interval/TIME_SEC; - tv.tv_usec = (interval%TIME_SEC)/TIME_USEC; + toTimeval(tv, interval); setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); } diff --git a/cpp/src/qpid/sys/posix/Socket.h b/cpp/src/qpid/sys/posix/Socket.h new file mode 100644 index 0000000000..614221354f --- /dev/null +++ b/cpp/src/qpid/sys/posix/Socket.h @@ -0,0 +1,76 @@ +#ifndef _sys_posix_Socket_h +#define _sys_posix_Socket_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/sys/Time.h" + +namespace qpid { +namespace sys { + +class Socket +{ + public: + /** Create an initialized TCP socket */ + static Socket createTcp(); + + /** Create a socket wrapper for descriptor. */ + Socket(int descriptor = 0); + + /** Set timeout for read and write */ + void setTimeout(const Duration& interval); + + void connect(const std::string& host, int port); + + void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + + /** Returns bytes sent or an ErrorCode value < 0. */ + ssize_t send(const void* data, size_t size); + + /** + * Returns bytes received, an ErrorCode value < 0 or 0 + * if the connection closed in an orderly manner. + */ + ssize_t recv(void* data, size_t size); + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10); + + /** Get file descriptor */ + int fd(); + + private: + void init() const; + mutable int socket; // Initialized on demand. +}; + +}} + + +#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/posix/Thread.cpp b/cpp/src/qpid/sys/posix/Thread.cpp index fc85e35028..dc9b21448f 100644 --- a/cpp/src/qpid/sys/posix/Thread.cpp +++ b/cpp/src/qpid/sys/posix/Thread.cpp @@ -19,7 +19,8 @@ * */ -#include "qpid/sys/Thread.h" +#include "Thread.h" +#include "qpid/sys/Runnable.h" void* qpid::sys::Thread::runRunnable(void* p) { diff --git a/cpp/src/qpid/sys/posix/Thread.h b/cpp/src/qpid/sys/posix/Thread.h new file mode 100644 index 0000000000..9de7299f5a --- /dev/null +++ b/cpp/src/qpid/sys/posix/Thread.h @@ -0,0 +1,86 @@ +#ifndef _sys_posix_Thread_h +#define _sys_posix_Thread_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "check.h" +#include <pthread.h> + +namespace qpid { +namespace sys { + +class Runnable; + +class Thread +{ + public: + inline static Thread current(); + inline static void yield(); + + inline Thread(); + inline explicit Thread(qpid::sys::Runnable*); + inline explicit Thread(qpid::sys::Runnable&); + + inline void join(); + + inline long id(); + + private: + static void* runRunnable(void* runnable); + inline Thread(pthread_t); + pthread_t thread; +}; + + +Thread::Thread() : thread(0) {} + +Thread::Thread(Runnable* runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); +} + +Thread::Thread(Runnable& runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); +} + +void Thread::join(){ + if (thread != 0) + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); +} + +long Thread::id() { + return long(thread); +} + +Thread::Thread(pthread_t thr) : thread(thr) {} + +Thread Thread::current() { + return Thread(pthread_self()); +} + +void Thread::yield() +{ + QPID_POSIX_THROW_IF(pthread_yield()); +} + + +}} +#endif /*!_sys_posix_Thread_h*/ diff --git a/cpp/src/qpid/sys/posix/Time.cpp b/cpp/src/qpid/sys/posix/Time.cpp new file mode 100644 index 0000000000..2228caea58 --- /dev/null +++ b/cpp/src/qpid/sys/posix/Time.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PrivatePosix.h" + +#include "qpid/sys/Time.h" + +#include <time.h> +#include <sys/time.h> + +namespace qpid { +namespace sys { + +AbsTime AbsTime::now() { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + AbsTime time_now; + time_now.time_ns = toTime(ts).nanosecs; + return time_now; +} + +struct timespec& toTimespec(struct timespec& ts, const Duration& t) { + ts.tv_sec = t / TIME_SEC; + ts.tv_nsec = t % TIME_SEC; + return ts; +} + +struct timeval& toTimeval(struct timeval& tv, const Duration& t) { + tv.tv_sec = t/TIME_SEC; + tv.tv_usec = (t%TIME_SEC)/TIME_USEC; + return tv; +} + +Duration toTime(const struct timespec& ts) { + return ts.tv_sec*TIME_SEC + ts.tv_nsec; +} + +}} + |
