diff options
Diffstat (limited to 'cpp/src/qpid/sys/apr')
| -rw-r--r-- | cpp/src/qpid/sys/apr/APRAcceptor.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Condition.h | 84 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFProcessor.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFSessionContext.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Module.h | 114 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Mutex.h | 72 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Socket.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Socket.h | 75 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Thread.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Thread.h | 93 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/Time.cpp | 36 |
11 files changed, 481 insertions, 3 deletions
diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp index dc021d2a3f..e9ce24ac2d 100644 --- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp +++ b/cpp/src/qpid/sys/apr/APRAcceptor.cpp @@ -20,6 +20,7 @@ */ #include "qpid/sys/Acceptor.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/sys/Mutex.h" #include "LFProcessor.h" #include "LFSessionContext.h" #include "APRBase.h" diff --git a/cpp/src/qpid/sys/apr/Condition.h b/cpp/src/qpid/sys/apr/Condition.h new file mode 100644 index 0000000000..5e544219ab --- /dev/null +++ b/cpp/src/qpid/sys/apr/Condition.h @@ -0,0 +1,84 @@ +#ifndef _sys_apr_Condition_h +#define _sys_apr_Condition_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "APRPool.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +#include <sys/errno.h> +#include <boost/noncopyable.hpp> +#include <apr_thread_cond.h> + +namespace qpid { +namespace sys { + +/** + * A condition variable for thread synchronization. + */ +class Condition +{ + public: + inline Condition(); + inline ~Condition(); + inline void wait(Mutex&); + inline bool wait(Mutex&, const AbsTime& absoluteTime); + inline void notify(); + inline void notifyAll(); + + private: + apr_thread_cond_t* condition; +}; + + +Condition::Condition() { + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Condition::~Condition() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Condition::wait(Mutex& mutex) { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex)); +} + +bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ + // APR uses microseconds. + apr_status_t status = + apr_thread_cond_timedwait( + condition, mutex.mutex, Duration(now(), absoluteTime)/TIME_USEC); + if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); + return status == 0; +} + +void Condition::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Condition::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +}} +#endif /*!_sys_apr_Condition_h*/ diff --git a/cpp/src/qpid/sys/apr/LFProcessor.cpp b/cpp/src/qpid/sys/apr/LFProcessor.cpp index 0d8ac425fe..9e139c874c 100644 --- a/cpp/src/qpid/sys/apr/LFProcessor.cpp +++ b/cpp/src/qpid/sys/apr/LFProcessor.cpp @@ -20,6 +20,7 @@ */ #include <sstream> #include "qpid/QpidError.h" +#include "qpid/sys/Mutex.h" #include "LFProcessor.h" #include "APRBase.h" #include "LFSessionContext.h" diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.h b/cpp/src/qpid/sys/apr/LFSessionContext.h index ed97b23645..3c90c4a381 100644 --- a/cpp/src/qpid/sys/apr/LFSessionContext.h +++ b/cpp/src/qpid/sys/apr/LFSessionContext.h @@ -30,6 +30,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/Buffer.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/ConnectionInputHandler.h" diff --git a/cpp/src/qpid/sys/apr/Module.h b/cpp/src/qpid/sys/apr/Module.h new file mode 100644 index 0000000000..d77cc0f388 --- /dev/null +++ b/cpp/src/qpid/sys/apr/Module.h @@ -0,0 +1,114 @@ +#ifndef _sys_apr_Module_h +#define _sys_apr_Module_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/QpidError.h" +#include "APRBase.h" +#include "APRPool.h" + +#include <boost/noncopyable.hpp> +#include <iostream> +#include <apr_dso.h> + +namespace qpid { +namespace sys { + +typedef apr_dso_handle_t* dso_handle_t; + +template <class T> class Module : private boost::noncopyable +{ + typedef T* create_t(); + typedef void destroy_t(T*); + + dso_handle_t handle; + destroy_t* destroy; + T* ptr; + + void load(const std::string& name); + void unload(); + void* getSymbol(const std::string& name); + +public: + Module(const std::string& name); + T* operator->(); + T* get(); + ~Module() throw(); +}; + +template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) +{ + load(module); + //TODO: need a better strategy for symbol names to allow multiple + //modules to be loaded without clashes... + + //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic + create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); + destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); + ptr = create(); +} + +template <class T> T* Module<T>::operator->() +{ + return ptr; +} + +template <class T> T* Module<T>::get() +{ + return ptr; +} + +template <class T> Module<T>::~Module() throw() +{ + try { + if (handle && ptr) { + destroy(ptr); + } + if (handle) unload(); + } catch (std::exception& e) { + std::cout << "Error while destroying module: " << e.what() << std::endl; + } + destroy = 0; + handle = 0; + ptr = 0; +} + +template <class T> void Module<T>::load(const std::string& name) +{ + CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get())); +} + +template <class T> void Module<T>::unload() +{ + CHECK_APR_SUCCESS(apr_dso_unload(handle)); +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + apr_dso_handle_sym_t symbol; + CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str())); + return (void*) symbol; +} + +}} +#endif //ifndef _sys_apr_Module_h + diff --git a/cpp/src/qpid/sys/apr/Mutex.h b/cpp/src/qpid/sys/apr/Mutex.h new file mode 100644 index 0000000000..700b5b910b --- /dev/null +++ b/cpp/src/qpid/sys/apr/Mutex.h @@ -0,0 +1,72 @@ +#ifndef _sys_apr_Mutex_h +#define _sys_apr_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "APRBase.h" +#include "APRPool.h" + +#include <boost/noncopyable.hpp> +#include <apr_thread_mutex.h> + +namespace qpid { +namespace sys { + +class Condition; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + public: + typedef ScopedLock<Mutex> ScopedLock; + typedef ScopedUnlock<Mutex> ScopedUnlock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + + protected: + apr_thread_mutex_t* mutex; + friend class Condition; +}; + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +}} +#endif /*!_sys_apr_Mutex_h*/ diff --git a/cpp/src/qpid/sys/apr/Socket.cpp b/cpp/src/qpid/sys/apr/Socket.cpp index 8cd98161d2..6e64d656d2 100644 --- a/cpp/src/qpid/sys/apr/Socket.cpp +++ b/cpp/src/qpid/sys/apr/Socket.cpp @@ -20,7 +20,7 @@ */ -#include "qpid/sys/Socket.h" +#include "Socket.h" #include "APRBase.h" #include "APRPool.h" @@ -40,7 +40,7 @@ Socket::Socket(apr_socket_t* s) { socket = s; } -void Socket::setTimeout(Time interval) { +void Socket::setTimeout(const Duration& interval) { apr_socket_timeout_set(socket, interval/TIME_USEC); } diff --git a/cpp/src/qpid/sys/apr/Socket.h b/cpp/src/qpid/sys/apr/Socket.h new file mode 100644 index 0000000000..c20c36dcd9 --- /dev/null +++ b/cpp/src/qpid/sys/apr/Socket.h @@ -0,0 +1,75 @@ +#ifndef _sys_apr_Socket_h +#define _sys_apr_Socket_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/sys/Time.h" + +#include <apr_network_io.h> + +namespace qpid { +namespace sys { + +class Socket +{ + public: + /** Create an initialized TCP socket */ + static Socket createTcp(); + + /** Create a socket wrapper for descriptor. */ + Socket(apr_socket_t* descriptor = 0); + + /** Set timeout for read and write */ + void setTimeout(const Duration& interval); + + void connect(const std::string& host, int port); + + void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + + /** Returns bytes sent or an ErrorCode value < 0. */ + ssize_t send(const void* data, size_t size); + + /** + * Returns bytes received, an ErrorCode value < 0 or 0 + * if the connection closed in an orderly manner. + */ + ssize_t recv(void* data, size_t size); + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10); + + /** Get file descriptor */ + int fd(); + + private: + apr_socket_t* socket; +}; + +}} +#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/apr/Thread.cpp b/cpp/src/qpid/sys/apr/Thread.cpp index 0f9aeca186..3369ef7eb1 100644 --- a/cpp/src/qpid/sys/apr/Thread.cpp +++ b/cpp/src/qpid/sys/apr/Thread.cpp @@ -19,7 +19,8 @@ * */ -#include "qpid/sys/Thread.h" +#include "Thread.h" +#include "qpid/sys/Runnable.h" using namespace qpid::sys; using qpid::sys::Runnable; diff --git a/cpp/src/qpid/sys/apr/Thread.h b/cpp/src/qpid/sys/apr/Thread.h new file mode 100644 index 0000000000..ce876efbdf --- /dev/null +++ b/cpp/src/qpid/sys/apr/Thread.h @@ -0,0 +1,93 @@ +#ifndef _sys_apr_Thread_h +#define _sys_apr_Thread_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "APRPool.h" +#include "APRBase.h" + +#include <apr_thread_proc.h> +#include <apr_portable.h> + +namespace qpid { +namespace sys { + +class Runnable; + +class Thread +{ + public: + inline static Thread current(); + inline static void yield(); + + inline Thread(); + inline explicit Thread(qpid::sys::Runnable*); + inline explicit Thread(qpid::sys::Runnable&); + + inline void join(); + + inline long id(); + + private: + static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); + inline Thread(apr_thread_t* t); + apr_thread_t* thread; +}; + +Thread::Thread() : thread(0) {} + +Thread::Thread(Runnable* runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); +} + +Thread::Thread(Runnable& runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); +} + +void Thread::join(){ + apr_status_t status; + if (thread != 0) + CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); +} + +long Thread::id() { + return long(thread); +} + +Thread::Thread(apr_thread_t* t) : thread(t) {} + +Thread Thread::current(){ + apr_thread_t* thr; + apr_os_thread_t osthr = apr_os_thread_current(); + CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); + return Thread(thr); +} + +void Thread::yield() +{ + apr_thread_yield(); +} + +}} +#endif /*!_sys_apr_Thread_h*/ diff --git a/cpp/src/qpid/sys/apr/Time.cpp b/cpp/src/qpid/sys/apr/Time.cpp new file mode 100644 index 0000000000..34e740b144 --- /dev/null +++ b/cpp/src/qpid/sys/apr/Time.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" + +#include <apr_time.h> + +namespace qpid { +namespace sys { + +AbsTime AbsTime::now() { + AbsTime time_now; + time_now.time_ns = apr_time_now() * TIME_USEC; + return time_now; +} + +}} + |
