diff options
| author | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 | 
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 | 
| commit | fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch) | |
| tree | a2ebf932750bf13bf3db271f92df390335b0e844 /cpp/lib/common/sys | |
| parent | 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff) | |
| download | qpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz | |
2006-12-01  Jim Meyering  <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy:
  - adds autoconf, automake, libtool support
  - makes the hierarchy flatter and renames a few files (e.g., Queue.h,
  Queue.cpp) that appeared twice, once under client/ and again under broker/.
In the process, I've changed many #include directives, mostly
to remove a qpid/ or qpid/framing/ prefix from the file name argument.
Although most changes were to .cpp and .h files under qpid/cpp/, there
were also several to template files under qpid/gentools, and even one
to CppGenerator.java.
Nearly all files are moved to a new position in the hierarchy.
The new hierarchy looks like this:
  src               # this is the new home of qpidd.cpp
  tests             # all tests are here.  See Makefile.am.
  gen               # As before, all generated files go here.
  lib               # This is just a container for the 3 lib dirs:
  lib/client
  lib/broker
  lib/common
  lib/common/framing
  lib/common/sys
  lib/common/sys/posix
  lib/common/sys/apr
  build-aux
  m4
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common/sys')
38 files changed, 3374 insertions, 0 deletions
| diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h new file mode 100644 index 0000000000..e6bc27a593 --- /dev/null +++ b/cpp/lib/common/sys/Acceptor.h @@ -0,0 +1,47 @@ +#ifndef _sys_Acceptor_h +#define _sys_Acceptor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdint.h> +#include <SharedObject.h> + +namespace qpid { +namespace sys { + +class SessionHandlerFactory; + +class Acceptor : public qpid::SharedObject<Acceptor> +{ +  public: +    static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); +    virtual ~Acceptor() = 0; +    virtual int16_t getPort() const = 0; +    virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; +    virtual void shutdown() = 0; +}; + +}} + + +     +#endif  /*!_sys_Acceptor_h*/ diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h new file mode 100644 index 0000000000..b625b2c9b0 --- /dev/null +++ b/cpp/lib/common/sys/AtomicCount.h @@ -0,0 +1,71 @@ +#ifndef _posix_AtomicCount_h +#define _posix_AtomicCount_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/detail/atomic_count.hpp> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Atomic counter. + */ +class AtomicCount : boost::noncopyable { +  public: +    class ScopedDecrement : boost::noncopyable { +      public: +        /** Decrement counter in constructor and increment in destructor. */ +        ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } +        ~ScopedDecrement() { ++count; } +        /** Return the value returned by the decrement. */ +        operator long() { return value; } +      private: +        AtomicCount& count; +        long value; +    }; + +    class ScopedIncrement : boost::noncopyable { +      public: +        /** Increment counter in constructor and increment in destructor. */ +        ScopedIncrement(AtomicCount& c) : count(c) { ++count; } +        ~ScopedIncrement() { --count; } +      private: +        AtomicCount& count; +    }; + +    AtomicCount(long value = 0) : count(value) {} +     +    void operator++() { ++count ; } +     +    long operator--() { return --count; } +     +    operator long() const { return count; } + +     +  private: +    boost::detail::atomic_count  count; +}; + + +}} + + +#endif // _posix_AtomicCount_h diff --git a/cpp/lib/common/sys/Module.h b/cpp/lib/common/sys/Module.h new file mode 100644 index 0000000000..9bf5d6e1fc --- /dev/null +++ b/cpp/lib/common/sys/Module.h @@ -0,0 +1,161 @@ +#ifndef _sys_Module_h +#define _sys_Module_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/noncopyable.hpp> +#include <iostream> +#include <QpidError.h> + +namespace qpid { +namespace sys { +#if USE_APR +#include <apr_dso.h> +    typedef apr_dso_handle_t* dso_handle_t; +#else  +    typedef void* dso_handle_t; +#endif + +    template <class T> class Module : private boost::noncopyable +    { +        typedef T* create_t(); +        typedef void destroy_t(T*); +         +        dso_handle_t handle; +        destroy_t* destroy; +        T* ptr; + +        void load(const std::string& name); +        void unload(); +        void* getSymbol(const std::string& name); + +    public: +        Module(const std::string& name); +        T* operator->();  +        T* get();  +        ~Module() throw(); +    }; + +} +} + +using namespace qpid::sys; + +template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0)  +{ +    load(module); +    //TODO: need a better strategy for symbol names to allow multiple +    //modules to be loaded without clashes... + +    //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic +    create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); +    destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); +    ptr = create(); +} + +template <class T> T* Module<T>::operator->()  +{  +    return ptr;  +} + +template <class T> T* Module<T>::get()  +{  +    return ptr;  +} + +template <class T> Module<T>::~Module() throw() +{ +    try { +        if (handle && ptr) { +            destroy(ptr); +        } +        if (handle) unload(); +    } catch (std::exception& e) { +        std::cout << "Error while destroying module: " << e.what() << std::endl; +    } +    destroy = 0; +    handle = 0; +    ptr = 0; +} + +// APR ================================================================ +#if USE_APR + +#include <apr/APRBase.h> +#include <apr/APRPool.h> + +template <class T> void Module<T>::load(const std::string& name) +{ +    CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get())); +} + +template <class T> void Module<T>::unload() +{ +    CHECK_APR_SUCCESS(apr_dso_unload(handle)); +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ +    apr_dso_handle_sym_t symbol; +    CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str())); +    return (void*) symbol; +} + +// POSIX================================================================ +#else  + +#include <dlfcn.h> + +template <class T> void Module<T>::load(const std::string& name) +{ +    dlerror(); +    handle = dlopen(name.c_str(), RTLD_NOW); +    const char* error = dlerror(); +    if (error) { +        THROW_QPID_ERROR(INTERNAL_ERROR, error); +    } +} + +template <class T> void Module<T>::unload() +{ +    dlerror(); +    dlclose(handle); +    const char* error = dlerror(); +    if (error) { +        THROW_QPID_ERROR(INTERNAL_ERROR, error); +    } +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ +    dlerror(); +    void* sym = dlsym(handle, name.c_str()); +    const char* error = dlerror(); +    if (error) { +        THROW_QPID_ERROR(INTERNAL_ERROR, error); +    } +    return sym; +} + +#endif //if USE_APR + +#endif //ifndef _sys_Module_h + diff --git a/cpp/lib/common/sys/Monitor.h b/cpp/lib/common/sys/Monitor.h new file mode 100644 index 0000000000..e58931e699 --- /dev/null +++ b/cpp/lib/common/sys/Monitor.h @@ -0,0 +1,127 @@ +#ifndef _sys_Monitor_h +#define _sys_Monitor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/errno.h> +#include <boost/noncopyable.hpp> +#include <sys/Mutex.h> +#include <sys/Time.h> + +#ifdef USE_APR +#  include <apr_thread_cond.h> +#endif + +namespace qpid { +namespace sys { + +/** + * A monitor is a condition variable and a mutex + */ +class Monitor : public Mutex +{ +  public: +    inline Monitor(); +    inline ~Monitor(); +    inline void wait(); +    inline bool wait(const Time& absoluteTime); +    inline void notify(); +    inline void notifyAll(); + +  private: +#ifdef USE_APR +    apr_thread_cond_t* condition; +#else +    pthread_cond_t condition; +#endif +}; + + +// APR ================================================================ +#ifdef USE_APR + +Monitor::Monitor() { +    CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Monitor::~Monitor() { +    CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Monitor::wait() { +    CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +bool Monitor::wait(const Time& absoluteTime){ +    // APR uses microseconds. +    apr_status_t status = +        apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC); +    if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); +    return status == 0; +} + +void Monitor::notify(){ +    CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Monitor::notifyAll(){ +    CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +#else +// POSIX ================================================================ + +Monitor::Monitor() { +    QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); +} + +Monitor::~Monitor() { +    QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); +} + +void Monitor::wait() { +    QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex)); +} + +bool Monitor::wait(const Time& absoluteTime){ +    struct timespec ts; +    toTimespec(ts, absoluteTime); +    int status = pthread_cond_timedwait(&condition, &mutex, &ts); +    if (status != 0) { +        if (status == ETIMEDOUT) return false; +        throw QPID_POSIX_ERROR(status); +    } +    return true; +} + +void Monitor::notify(){ +    QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); +} + +void Monitor::notifyAll(){ +    QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); +} +#endif  /*USE_APR*/ + + +}} +#endif  /*!_sys_Monitor_h*/ diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h new file mode 100644 index 0000000000..4022da2f6e --- /dev/null +++ b/cpp/lib/common/sys/Mutex.h @@ -0,0 +1,151 @@ +#ifndef _sys_Mutex_h +#define _sys_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifdef USE_APR +#  include <apr_thread_mutex.h> +#  include <apr/APRBase.h> +#  include <apr/APRPool.h> +#else +#  include <pthread.h> +#  include <posix/check.h> +#endif +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Scoped lock template: calls lock() in ctor, unlock() in dtor. + * L can be any class with lock() and unlock() functions. + */ +template <class L> +class ScopedLock +{ +  public: +    ScopedLock(L& l) : mutex(l) { l.lock(); } +    ~ScopedLock() { mutex.unlock(); } +  private: +    L& mutex; +}; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { +  public: +    typedef ScopedLock<Mutex> ScopedLock; +     +    inline Mutex(); +    inline ~Mutex(); +    inline void lock(); +    inline void unlock(); +    inline void trylock(); + +  protected: +#ifdef USE_APR +    apr_thread_mutex_t* mutex; +#else +    pthread_mutex_t mutex; +#endif +}; + +#ifdef USE_APR +// APR ================================================================ + +Mutex::Mutex() { +    CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ +    CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { +    CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { +    CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { +    CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +#else +// POSIX ================================================================ + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex  +{ +    typedef ScopedLock<PODMutex> ScopedLock; + +    inline void lock(); +    inline void unlock(); +    inline void trylock(); + +    // Must be public to be a POD: +    pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + + +void PODMutex::lock() { +    QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void PODMutex::unlock() { +    QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void PODMutex::trylock() { +    QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + + +Mutex::Mutex() { +    QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0)); +} + +Mutex::~Mutex(){ +    QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { +    QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void Mutex::unlock() { +    QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void Mutex::trylock() { +    QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +#endif // USE_APR + +}} + + + +#endif  /*!_sys_Mutex_h*/ diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp new file mode 100644 index 0000000000..30122c682f --- /dev/null +++ b/cpp/lib/common/sys/Runnable.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Runnable.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace sys { + +Runnable::~Runnable() {} + +Runnable::Functor Runnable::functor()  +{ +    return boost::bind(&Runnable::run, this); +} + +}} diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h new file mode 100644 index 0000000000..fb3927c612 --- /dev/null +++ b/cpp/lib/common/sys/Runnable.h @@ -0,0 +1,50 @@ +#ifndef _Runnable_ +#define _Runnable_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/function.hpp> + +namespace qpid { +namespace sys { + +/** + * Interface for objects that can be run, e.g. in a thread. + */ +class Runnable +{ +  public: +    /** Type to represent a runnable as a Functor */ +    typedef boost::function0<void> Functor; +     +    virtual ~Runnable(); + +    /** Derived classes override run(). */ +    virtual void run() = 0; + +    /** Create a functor object that will call this->run(). */ +    Functor functor(); +}; + +}} + + +#endif diff --git a/cpp/lib/common/sys/SessionContext.h b/cpp/lib/common/sys/SessionContext.h new file mode 100644 index 0000000000..671e00774f --- /dev/null +++ b/cpp/lib/common/sys/SessionContext.h @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionContext_ +#define _SessionContext_ + +#include <OutputHandler.h> + +namespace qpid { +namespace sys { + +/** + * Provides the output handler associated with a connection. + */ +class SessionContext : public virtual qpid::framing::OutputHandler  +{ +  public: +    virtual void close() = 0; +}; + +}} + + +#endif diff --git a/cpp/lib/common/sys/SessionHandler.h b/cpp/lib/common/sys/SessionHandler.h new file mode 100644 index 0000000000..76f79d421d --- /dev/null +++ b/cpp/lib/common/sys/SessionHandler.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandler_ +#define _SessionHandler_ + +#include <InputHandler.h> +#include <InitiationHandler.h> +#include <ProtocolInitiation.h> +#include <sys/TimeoutHandler.h> + +namespace qpid { +namespace sys { + +    class SessionHandler : +        public qpid::framing::InitiationHandler, +        public qpid::framing::InputHandler,  +        public TimeoutHandler +    { +    public: +        virtual void closed() = 0; +    }; + +} +} + + +#endif diff --git a/cpp/lib/common/sys/SessionHandlerFactory.h b/cpp/lib/common/sys/SessionHandlerFactory.h new file mode 100644 index 0000000000..2a01aebcb0 --- /dev/null +++ b/cpp/lib/common/sys/SessionHandlerFactory.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandlerFactory_ +#define _SessionHandlerFactory_ + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +class SessionContext; +class SessionHandler; + +/** + * Callback interface used by the Acceptor to + * create a SessionHandler for each new connection. + */ +class SessionHandlerFactory : private boost::noncopyable +{ +  public: +    virtual SessionHandler* create(SessionContext* ctxt) = 0; +    virtual ~SessionHandlerFactory(){} +}; + +}} + + +#endif diff --git a/cpp/lib/common/sys/ShutdownHandler.h b/cpp/lib/common/sys/ShutdownHandler.h new file mode 100644 index 0000000000..88baecb5b6 --- /dev/null +++ b/cpp/lib/common/sys/ShutdownHandler.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ShutdownHandler_ +#define _ShutdownHandler_ + +namespace qpid { +namespace sys { + +    class ShutdownHandler +    { +    public: +	virtual void shutdown() = 0; +	virtual ~ShutdownHandler(){} +    }; + +} +} + +#endif diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h new file mode 100644 index 0000000000..d793a240c6 --- /dev/null +++ b/cpp/lib/common/sys/Socket.h @@ -0,0 +1,88 @@ +#ifndef _sys_Socket_h +#define _sys_Socket_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <sys/Time.h> + +#ifdef USE_APR +#  include <apr_network_io.h> +#endif + +namespace qpid { +namespace sys { + +class Socket +{ +  public: +    /** Create an initialized TCP socket */ +    static Socket createTcp(); + +    /** Create a socket wrapper for descriptor. */ +#ifdef USE_APR +    Socket(apr_socket_t* descriptor = 0); +#else +    Socket(int descriptor = 0); +#endif +     +    /** Set timeout for read and write */ +    void setTimeout(Time interval); + +    void connect(const std::string& host, int port); + +    void close(); + +    enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + +    /** Returns bytes sent or an ErrorCode value < 0. */ +    ssize_t send(const void* data, size_t size); + +    /** +     * Returns bytes received, an ErrorCode value < 0 or 0 +     * if the connection closed in an orderly manner. +     */ +    ssize_t recv(void* data, size_t size); + +    /** Bind to a port and start listening. +     *@param port 0 means choose an available port. +     *@param backlog maximum number of pending connections. +     *@return The bound port. +     */ +    int listen(int port = 0, int backlog = 10); + +    /** Get file descriptor */ +    int fd();  +     +  private: +#ifdef USE_APR     +    apr_socket_t* socket; +#else +    void init() const; +    mutable int socket;         // Initialized on demand.  +#endif +}; + +}} + + +#endif  /*!_sys_Socket_h*/ diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h new file mode 100644 index 0000000000..47b95b6234 --- /dev/null +++ b/cpp/lib/common/sys/Thread.h @@ -0,0 +1,142 @@ +#ifndef _sys_Thread_h +#define _sys_Thread_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Runnable.h> + +#ifdef USE_APR +#  include <apr_thread_proc.h> +#  include <apr_portable.h> +#  include <apr/APRPool.h> +#  include <apr/APRBase.h> +#else +#  include <posix/check.h> +#  include <pthread.h> +#endif + +namespace qpid { +namespace sys { + +class Thread +{ +  public: +    inline static Thread current(); +    inline static void yield(); + +    inline Thread(); +    inline explicit Thread(qpid::sys::Runnable*); +    inline explicit Thread(qpid::sys::Runnable&); +     +    inline void join(); + +    inline long id(); +         +  private: +#ifdef USE_APR +    static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); +    inline Thread(apr_thread_t* t); +    apr_thread_t* thread; +#else +    static void* runRunnable(void* runnable); +    inline Thread(pthread_t); +    pthread_t thread; +#endif     +}; + + +Thread::Thread() : thread(0) {} + +// APR ================================================================ +#ifdef USE_APR + +Thread::Thread(Runnable* runnable) { +    CHECK_APR_SUCCESS( +        apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); +} + +Thread::Thread(Runnable& runnable) { +    CHECK_APR_SUCCESS( +        apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); +} + +void Thread::join(){ +    apr_status_t status; +    if (thread != 0)  +        CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); +} + +long Thread::id() { +    return long(thread); +} + +Thread::Thread(apr_thread_t* t) : thread(t) {} + +Thread Thread::current(){ +    apr_thread_t* thr; +    apr_os_thread_t osthr = apr_os_thread_current(); +    CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); +    return Thread(thr); +} + +void Thread::yield()  +{ +    apr_thread_yield(); +} + + +// POSIX ================================================================ +#else + +Thread::Thread(Runnable* runnable) { +    QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); +} + +Thread::Thread(Runnable& runnable) { +    QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); +} + +void Thread::join(){ +    QPID_POSIX_THROW_IF(pthread_join(thread, 0)); +} + +long Thread::id() { +    return long(thread); +} + +Thread::Thread(pthread_t thr) : thread(thr) {} + +Thread Thread::current() { +    return Thread(pthread_self()); +} + +void Thread::yield()  +{ +    QPID_POSIX_THROW_IF(pthread_yield()); +} + + +#endif + +}} + +#endif  /*!_sys_Thread_h*/ diff --git a/cpp/lib/common/sys/Time.cpp b/cpp/lib/common/sys/Time.cpp new file mode 100644 index 0000000000..ad6185b966 --- /dev/null +++ b/cpp/lib/common/sys/Time.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Time.h" + +namespace qpid { +namespace sys { + +// APR ================================================================ +#if USE_APR + +Time now() { return apr_time_now() * TIME_USEC; } + +// POSIX================================================================ +#else  + +Time now() { +    struct timespec ts; +    clock_gettime(CLOCK_REALTIME, &ts); +    return toTime(ts); +} + +struct timespec toTimespec(const Time& t) { +    struct timespec ts; +    toTimespec(ts, t); +    return ts; +} + +struct timespec& toTimespec(struct timespec& ts, const Time& t) { +    ts.tv_sec  = t / TIME_SEC; +    ts.tv_nsec = t % TIME_SEC; +    return  ts; +} + +Time toTime(const struct timespec& ts) { +    return ts.tv_sec*TIME_SEC + ts.tv_nsec; +} + + +#endif +}} + diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h new file mode 100644 index 0000000000..3dd46741d8 --- /dev/null +++ b/cpp/lib/common/sys/Time.h @@ -0,0 +1,58 @@ +#ifndef _sys_Time_h +#define _sys_Time_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdint.h> + +#ifdef USE_APR +#  include <apr_time.h> +#else +#  include <time.h> +#endif + +namespace qpid { +namespace sys { + +/** Time in nanoseconds */ +typedef int64_t Time; + +Time now(); + +/** Nanoseconds per second. */ +const Time TIME_SEC  = 1000*1000*1000; +/** Nanoseconds per millisecond */ +const Time TIME_MSEC = 1000*1000; +/** Nanoseconds per microseconds. */ +const Time TIME_USEC = 1000; +/** Nanoseconds per nanosecond. */ +const Time TIME_NSEC = 1; + +#ifndef USE_APR +struct timespec toTimespec(const Time& t); +struct timespec& toTimespec(struct timespec& ts, const Time& t); +Time toTime(const struct timespec& ts); +#endif + +}} + +#endif  /*!_sys_Time_h*/ diff --git a/cpp/lib/common/sys/TimeoutHandler.h b/cpp/lib/common/sys/TimeoutHandler.h new file mode 100644 index 0000000000..0c10709bbf --- /dev/null +++ b/cpp/lib/common/sys/TimeoutHandler.h @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TimeoutHandler_ +#define _TimeoutHandler_ + +namespace qpid { +namespace sys { + +    class TimeoutHandler +    { +    public: +	virtual void idleOut() = 0; +	virtual void idleIn() = 0; +	virtual ~TimeoutHandler(){} +    }; + +} +} + + +#endif diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp new file mode 100644 index 0000000000..c998b33625 --- /dev/null +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <sys/Acceptor.h> +#include <sys/SessionHandlerFactory.h> +#include "LFProcessor.h" +#include "LFSessionContext.h" +#include "APRBase.h" +#include "APRPool.h" + +namespace qpid { +namespace sys { + +class APRAcceptor : public Acceptor +{ +  public: +    APRAcceptor(int16_t port, int backlog, int threads, bool trace); +    virtual int16_t getPort() const; +    virtual void run(qpid::sys::SessionHandlerFactory* factory); +    virtual void shutdown(); + +  private: +    int16_t port; +    bool trace; +    LFProcessor processor; +    apr_socket_t* socket; +    volatile bool running; +}; + +// Define generic Acceptor::create() to return APRAcceptor. +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace) +{ +    return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace)); +} +// Must define Acceptor virtual dtor. +Acceptor::~Acceptor() {} + +    APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : +    port(port_), +    trace(trace_), +    processor(APRPool::get(), threads, 1000, 5000000) +{ +    apr_sockaddr_t* address; +    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); +    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); +    CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); +    CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); +    CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t APRAcceptor::getPort() const { +    apr_sockaddr_t* address; +    CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); +    return address->port; +} + +void APRAcceptor::run(SessionHandlerFactory* factory) { +    running = true; +    processor.start(); +    std::cout << "Listening on port " << getPort() << "..." << std::endl; +    while(running){ +        apr_socket_t* client; +        apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); +        if(status == APR_SUCCESS){ +            //make this socket non-blocking: +            CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); +            LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace); +            session->init(factory->create(session)); +        }else{ +            running = false; +            if(status != APR_EINTR){ +                std::cout << "ERROR: " << get_desc(status) << std::endl; +            } +        } +    } +    shutdown(); +} + +void APRAcceptor::shutdown() { +    // TODO aconway 2006-10-12: Cleanup, this is not thread safe. +    if (running) { +        running = false; +        processor.stop(); +        CHECK_APR_SUCCESS(apr_socket_close(socket)); +    } +} + + +}} diff --git a/cpp/lib/common/sys/apr/APRBase.cpp b/cpp/lib/common/sys/apr/APRBase.cpp new file mode 100644 index 0000000000..19a1b93103 --- /dev/null +++ b/cpp/lib/common/sys/apr/APRBase.cpp @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <QpidError.h> +#include "APRBase.h" + +using namespace qpid::sys; + +APRBase* APRBase::instance = 0; + +APRBase* APRBase::getInstance(){ +    if(instance == 0){ +	instance = new APRBase(); +    } +    return instance; +} + + +APRBase::APRBase() : count(0){ +    apr_initialize(); +    CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); +    CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRBase::~APRBase(){ +    CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +    apr_pool_destroy(pool); +    apr_terminate();   +} + +bool APRBase::_increment(){ +    bool deleted(false); +    CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +    if(this == instance){ +	count++; +    }else{ +	deleted = true; +    } +    CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +    return !deleted; +} + +void APRBase::_decrement(){ +    APRBase* copy = 0; +    CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +    if(--count == 0){ +	copy = instance; +	instance = 0; +    } +    CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +    if(copy != 0){ +	delete copy; +    } +} + +void APRBase::increment(){ +    int count = 0; +    while(count++ < 2 && !getInstance()->_increment()){ +        std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; +    } +} + +void APRBase::decrement(){ +    getInstance()->_decrement(); +} + +void qpid::sys::check(apr_status_t status, const std::string& file, const int line){ +    if (status != APR_SUCCESS){ +        const int size = 50; +        char tmp[size]; +        std::string msg(apr_strerror(status, tmp, size)); +        throw QpidError(APR_ERROR + ((int) status), msg, +                        qpid::SrcLine(file, line)); +    } +} + +std::string qpid::sys::get_desc(apr_status_t status){ +    const int size = 50; +    char tmp[size]; +    return std::string(apr_strerror(status, tmp, size)); +} + diff --git a/cpp/lib/common/sys/apr/APRBase.h b/cpp/lib/common/sys/apr/APRBase.h new file mode 100644 index 0000000000..d1b3e21b91 --- /dev/null +++ b/cpp/lib/common/sys/apr/APRBase.h @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _APRBase_ +#define _APRBase_ + +#include <string> +#include <apr_thread_mutex.h> +#include <apr_errno.h> + +namespace qpid { +namespace sys { + +    /** +     * Use of APR libraries necessitates explicit init and terminate +     * calls. Any class using APR libs should obtain the reference to +     * this singleton and increment on construction, decrement on +     * destruction. This class can then correctly initialise apr +     * before the first use and terminate after the last use. +     */ +    class APRBase{ +	static APRBase* instance; +	apr_pool_t* pool; +	apr_thread_mutex_t* mutex; +	int count; + +	APRBase(); +	~APRBase(); +	static APRBase* getInstance(); +	bool _increment(); +	void _decrement(); +    public: +	static void increment(); +	static void decrement(); +    }; + +    //this is also a convenient place for a helper function for error checking: +    void check(apr_status_t status, const std::string& file, const int line); +    std::string get_desc(apr_status_t status); + +#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__);  + +} +} + + + + +#endif diff --git a/cpp/lib/common/sys/apr/APRPool.cpp b/cpp/lib/common/sys/apr/APRPool.cpp new file mode 100644 index 0000000000..e8b71f6e8a --- /dev/null +++ b/cpp/lib/common/sys/apr/APRPool.cpp @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "APRPool.h" +#include "APRBase.h" +#include <boost/pool/detail/singleton.hpp> + +using namespace qpid::sys; + +APRPool::APRPool(){ +    APRBase::increment(); +    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +APRPool::~APRPool(){ +    apr_pool_destroy(pool); +    APRBase::decrement(); +} + +apr_pool_t* APRPool::get() { +    return boost::details::pool::singleton_default<APRPool>::instance().pool; +} + diff --git a/cpp/lib/common/sys/apr/APRPool.h b/cpp/lib/common/sys/apr/APRPool.h new file mode 100644 index 0000000000..da7661fcfa --- /dev/null +++ b/cpp/lib/common/sys/apr/APRPool.h @@ -0,0 +1,50 @@ +#ifndef _APRPool_ +#define _APRPool_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/noncopyable.hpp> +#include <apr_pools.h> + +namespace qpid { +namespace sys { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { +  public: +    APRPool(); +    ~APRPool(); + +    /** Get singleton instance */ +    static apr_pool_t* get(); + +  private: +    apr_pool_t* pool; +}; + +}} + + + + + +#endif  /*!_APRPool_*/ diff --git a/cpp/lib/common/sys/apr/APRSocket.cpp b/cpp/lib/common/sys/apr/APRSocket.cpp new file mode 100644 index 0000000000..4917803370 --- /dev/null +++ b/cpp/lib/common/sys/apr/APRSocket.cpp @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "APRBase.h" +#include "APRSocket.h" +#include <assert.h> +#include <iostream> + +using namespace qpid::sys; +using namespace qpid::framing; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ +    apr_size_t bytes; +    bytes = buffer.available(); +    apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); +    buffer.move(bytes); +    if(APR_STATUS_IS_TIMEUP(s)){ +        //timed out +    }else if(APR_STATUS_IS_EOF(s)){ +        close(); +    } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ +    apr_size_t bytes; +    do{ +        bytes = buffer.available(); +        apr_socket_send(socket, buffer.start(), &bytes); +        buffer.move(bytes);     +    }while(bytes > 0); +} + +void APRSocket::close(){ +    if(!closed){ +        std::cout << "Closing socket " << socket << "@" << this << std::endl; +        CHECK_APR_SUCCESS(apr_socket_close(socket)); +        closed = true; +    } +} + +bool APRSocket::isOpen(){ +    return !closed; +} + +u_int8_t APRSocket::read(){ +    char data[1]; +    apr_size_t bytes = 1; +    apr_status_t s = apr_socket_recv(socket, data, &bytes); +    if(APR_STATUS_IS_EOF(s) || bytes == 0){ +        return 0; +    }else{ +        return *data; +    } +} + +APRSocket::~APRSocket(){ +} diff --git a/cpp/lib/common/sys/apr/APRSocket.h b/cpp/lib/common/sys/apr/APRSocket.h new file mode 100644 index 0000000000..53f1055c6a --- /dev/null +++ b/cpp/lib/common/sys/apr/APRSocket.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include <apr_network_io.h> +#include <Buffer.h> + +namespace qpid { +namespace sys { + +    class APRSocket +    { +	apr_socket_t* const socket; +        volatile bool closed; +    public: +	APRSocket(apr_socket_t* socket); +        void read(qpid::framing::Buffer& b); +        void write(qpid::framing::Buffer& b); +        void close(); +        bool isOpen(); +        u_int8_t read(); +	~APRSocket(); +    }; + +} +} + + +#endif diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp new file mode 100644 index 0000000000..2b6fc92623 --- /dev/null +++ b/cpp/lib/common/sys/apr/LFProcessor.cpp @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <sstream> +#include <QpidError.h> +#include "LFProcessor.h" +#include "APRBase.h" +#include "LFSessionContext.h" + +using namespace qpid::sys; +using qpid::QpidError; + +// TODO aconway 2006-10-12: stopped is read outside locks. +// + +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : +    size(_size), +    timeout(_timeout),  +    signalledCount(0), +    current(0), +    count(0), +    workerCount(_workers), +    hasLeader(false), +    workers(new Thread[_workers]), +    stopped(false) +{ + +    CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); +} + + +LFProcessor::~LFProcessor(){ +    if (!stopped) stop(); +    delete[] workers; +    CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + +void LFProcessor::start(){ +    for(int i = 0; i < workerCount; i++){ +        workers[i] = Thread(this); +    } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ +    CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +    Monitor::ScopedLock l(countLock); +    sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); +    count++; +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ +    CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +    Monitor::ScopedLock l(countLock); +    sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); +    count--; +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ +    CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ +    CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ +    CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +    CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ +    Mutex::ScopedLock locker(countLock); +    return count == size;  +} + +bool LFProcessor::empty(){ +    Mutex::ScopedLock locker(countLock); +    return count == 0;  +} + +void LFProcessor::poll() { +    apr_status_t status = APR_EGENERAL; +    do{ +        current = 0; +        if(!stopped){ +            status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); +        } +    }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ +    try{ +        while(!stopped){ +            const apr_pollfd_t* event = 0; +            LFSessionContext* session = 0; +            { +                Monitor::ScopedLock l(leadLock); +                waitToLead(); +                event = getNextEvent(); +                if(!event) return; +                session = reinterpret_cast<LFSessionContext*>( +                    event->client_data); +                session->startProcessing(); +                relinquishLead(); +            } + +            //process event: +            if(event->rtnevents & APR_POLLIN) session->read(); +            if(event->rtnevents & APR_POLLOUT) session->write(); + +            if(session->isClosed()){ +                session->handleClose(); +                Monitor::ScopedLock l(countLock); +                sessions.erase(find(sessions.begin(),sessions.end(), session)); +                count--; +            }else{ +                session->stopProcessing(); +            } +        } +    }catch(std::exception e){ +	std::cout << e.what() << std::endl; +    } +} + +void LFProcessor::waitToLead(){ +    while(hasLeader && !stopped) leadLock.wait(); +    hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ +    hasLeader = false; +    leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ +    while(true){ +        if(stopped){ +            return 0; +        }else if(current < signalledCount){ +            //use result of previous poll if one is available +            return signalledFDs + (current++); +        }else{ +            //else poll to get new events +            poll(); +        } +    } +} + +void LFProcessor::stop(){ +    stopped = true; +    { +        Monitor::ScopedLock l(leadLock); +        leadLock.notifyAll(); +    } +    for(int i = 0; i < workerCount; i++){ +        workers[i].join(); +    } +    for(iterator i = sessions.begin(); i < sessions.end(); i++){ +        (*i)->shutdown(); +    } +} + diff --git a/cpp/lib/common/sys/apr/LFProcessor.h b/cpp/lib/common/sys/apr/LFProcessor.h new file mode 100644 index 0000000000..de90199472 --- /dev/null +++ b/cpp/lib/common/sys/apr/LFProcessor.h @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include <apr_poll.h> +#include <iostream> +#include <vector> +#include <sys/Monitor.h> +#include <sys/Runnable.h> +#include <sys/Thread.h> + +namespace qpid { +namespace sys { + +    class LFSessionContext; + +    /** +     * This class processes a poll set using the leaders-followers +     * pattern for thread synchronization: the leader will poll and on +     * the poll returning, it will remove a session, promote a +     * follower to leadership, then process the session. +     */ +    class LFProcessor : private virtual qpid::sys::Runnable +    { +        typedef std::vector<LFSessionContext*>::iterator iterator; +         +        const int size; +        const apr_interval_time_t timeout; +        apr_pollset_t* pollset; +        int signalledCount; +        int current; +        const apr_pollfd_t* signalledFDs; +        int count; +        const int workerCount; +        bool hasLeader; +        qpid::sys::Thread* workers; +        qpid::sys::Monitor leadLock; +        qpid::sys::Mutex countLock; +        std::vector<LFSessionContext*> sessions; +        volatile bool stopped; + +        const apr_pollfd_t* getNextEvent(); +        void waitToLead(); +        void relinquishLead(); +        void poll();         +        virtual void run();         + +    public: +        LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); +        /** +         * Add the fd to the poll set. Relies on the client_data being +         * an instance of LFSessionContext. +         */ +        void add(const apr_pollfd_t* const fd); +        /** +         * Remove the fd from the poll set. +         */ +        void remove(const apr_pollfd_t* const fd); +        /** +         * Signal that the fd passed in, already part of the pollset, +         * has had its flags altered. +         */ +        void update(const apr_pollfd_t* const fd); +        /** +         * Add an fd back to the poll set after deactivation. +         */ +        void reactivate(const apr_pollfd_t* const fd); +        /** +         * Temporarily remove the fd from the poll set. Called when processing +         * is about to begin. +         */ +        void deactivate(const apr_pollfd_t* const fd); +        /** +         * Indicates whether the capacity of this processor has been +         * reached (or whether it can still handle further fd's). +         */ +        bool full(); +        /** +         * Indicates whether there are any fd's registered. +         */ +        bool empty(); +        /** +         * Stop processing. +         */ +        void stop(); +        /** +         * Start processing. +         */ +        void start(); +        /** +         * Is processing stopped? +         */ +        bool isStopped(); +         +	~LFProcessor(); +    }; + +} +} + + +#endif diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp new file mode 100644 index 0000000000..7fb8d5a91b --- /dev/null +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LFSessionContext.h" +#include "APRBase.h" +#include <QpidError.h> +#include <assert.h> + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,  +                                   LFProcessor* const _processor, +                                   bool _debug) : +    debug(_debug), +    socket(_socket), +    initiated(false), +    in(65536), +    out(65536), +    processor(_processor), +    processing(false), +    closing(false) +{ +     +    fd.p = _pool; +    fd.desc_type = APR_POLL_SOCKET; +    fd.reqevents = APR_POLLIN; +    fd.client_data = this; +    fd.desc.s = _socket; + +    out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ +    socket.read(in); +    in.flip(); +    if(initiated){ +        AMQFrame frame; +        while(frame.decode(in)){ +            if(debug) log("RECV", &frame); +            handler->received(&frame); +        } +    }else{ +        ProtocolInitiation protocolInit; +        if(protocolInit.decode(in)){ +            handler->initiated(&protocolInit); +            initiated = true; +            if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; +        } +    } +    in.compact(); +} + +void LFSessionContext::write(){ +    bool done = isClosed(); +    while(!done){ +        if(out.available() > 0){ +            socket.write(out); +            if(out.available() > 0){ + +                //incomplete write, leave flags to receive notification of readiness to write +                done = true;//finished processing for now, but write is still in progress +            } +        }else{ +            //do we have any frames to write? +            Mutex::ScopedLock l(writeLock); +            if(!framesToWrite.empty()){ +                out.clear(); +                bool encoded(false); +                AMQFrame* frame = framesToWrite.front(); +                while(frame && out.available() >= frame->size()){ +                    encoded = true; +                    frame->encode(out); +                    if(debug) log("SENT", frame); +                    delete frame; +                    framesToWrite.pop(); +                    frame = framesToWrite.empty() ? 0 : framesToWrite.front(); +                } +                if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); +                out.flip(); +            }else{ +                //reset flags, don't care about writability anymore +                fd.reqevents = APR_POLLIN; +                done = true; + +                if(closing){ +                    socket.close(); +                } +            } +        } +    } +} + +void LFSessionContext::send(AMQFrame* frame){ +    Mutex::ScopedLock l(writeLock); +    if(!closing){ +        framesToWrite.push(frame); +        if(!(fd.reqevents & APR_POLLOUT)){ +            fd.reqevents |= APR_POLLOUT; +            if(!processing){ +                processor->update(&fd); +            } +        } +    } +} + +void LFSessionContext::startProcessing(){ +    Mutex::ScopedLock l(writeLock); +    processing = true; +    processor->deactivate(&fd); +} + +void LFSessionContext::stopProcessing(){ +    Mutex::ScopedLock l(writeLock); +    processor->reactivate(&fd); +    processing = false; +} + +void LFSessionContext::close(){ +    closing = true; +    Mutex::ScopedLock l(writeLock); +    if(!processing){ +        //allow pending frames to be written to socket +        fd.reqevents = APR_POLLOUT; +        processor->update(&fd); +    } +} + +void LFSessionContext::handleClose(){ +    handler->closed(); +    std::cout << "Session closed [" << &socket << "]" << std::endl; +    delete handler; +    delete this; +} + +void LFSessionContext::shutdown(){ +    socket.close(); +    handleClose(); +} + +void LFSessionContext::init(SessionHandler* _handler){ +    handler = _handler; +    processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ +    Mutex::ScopedLock l(logLock); +    std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; +} + +Mutex LFSessionContext::logLock; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h new file mode 100644 index 0000000000..9483cbe590 --- /dev/null +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include <queue> + +#include <apr_network_io.h> +#include <apr_poll.h> +#include <apr_time.h> + +#include <AMQFrame.h> +#include <Buffer.h> +#include <sys/Monitor.h> +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> + +#include "APRSocket.h" +#include "LFProcessor.h" + +namespace qpid { +namespace sys { + + +class LFSessionContext : public virtual qpid::sys::SessionContext +{ +    const bool debug; +    APRSocket socket; +    bool initiated; +         +    qpid::framing::Buffer in; +    qpid::framing::Buffer out; +         +    qpid::sys::SessionHandler* handler; +    LFProcessor* const processor; + +    apr_pollfd_t fd; + +    std::queue<qpid::framing::AMQFrame*> framesToWrite; +    qpid::sys::Mutex writeLock; +         +    bool processing; +    bool closing; + +    static qpid::sys::Mutex logLock; +    void log(const std::string& desc, +             qpid::framing::AMQFrame* const frame); +         + +  public: +    LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,  +                     LFProcessor* const processor,  +                     bool debug = false); +    virtual ~LFSessionContext(); +    virtual void send(qpid::framing::AMQFrame* frame); +    virtual void close();         +    void read(); +    void write(); +    void init(qpid::sys::SessionHandler* handler); +    void startProcessing(); +    void stopProcessing(); +    void handleClose();         +    void shutdown();         +    inline apr_pollfd_t* const getFd(){ return &fd; } +    inline bool isClosed(){ return !socket.isOpen(); } +}; + +} +} + + +#endif diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp new file mode 100644 index 0000000000..336eb4996a --- /dev/null +++ b/cpp/lib/common/sys/apr/Socket.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <sys/Socket.h> +#include <apr/APRBase.h> +#include <apr/APRPool.h> + + +using namespace qpid::sys; + +Socket Socket::createTcp() { +    Socket s; +    CHECK_APR_SUCCESS( +        apr_socket_create( +            &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, +            APRPool::get())); +    return s; +} + +Socket::Socket(apr_socket_t* s) { +    socket = s; +} + +void Socket::setTimeout(Time interval) { +    apr_socket_timeout_set(socket, interval/TIME_USEC); +} + +void Socket::connect(const std::string& host, int port) { +    apr_sockaddr_t* address; +    CHECK_APR_SUCCESS( +        apr_sockaddr_info_get( +            &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, +            APRPool::get())); +    CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); +} + +void Socket::close() { +    if (socket == 0) return; +    CHECK_APR_SUCCESS(apr_socket_close(socket)); +    socket = 0; +} + +ssize_t Socket::send(const void* data, size_t size) +{ +    apr_size_t sent = size; +    apr_status_t status = +        apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent); +    if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; +    if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF; +    CHECK_APR_SUCCESS(status); +    return sent; +} + +ssize_t Socket::recv(void* data, size_t size) +{ +    apr_size_t received = size; +    apr_status_t status = +        apr_socket_recv(socket, reinterpret_cast<char*>(data), &received); +    if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; +    CHECK_APR_SUCCESS(status); +     return received; +} + + diff --git a/cpp/lib/common/sys/apr/Thread.cpp b/cpp/lib/common/sys/apr/Thread.cpp new file mode 100644 index 0000000000..5c4799aa96 --- /dev/null +++ b/cpp/lib/common/sys/apr/Thread.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Thread.h> + +using namespace qpid::sys; +using qpid::sys::Runnable; + +void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) { +    reinterpret_cast<Runnable*>(data)->run(); +    CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); +    return NULL; +}  + + diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp new file mode 100644 index 0000000000..16c7ec9c3f --- /dev/null +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -0,0 +1,325 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <mqueue.h> +#include <string.h> +#include <iostream> + +#include <sys/errno.h> +#include <sys/socket.h> +#include <sys/epoll.h> + +#include <typeinfo> +#include <iostream> +#include <queue> + +#include <boost/ptr_container/ptr_map.hpp> +#include <boost/current_function.hpp> + +#include <QpidError.h> +#include <sys/Monitor.h> + +#include "check.h" +#include "EventChannel.h" + +using namespace std; + + +// Convenience template to zero out a struct. +template <class S> struct ZeroStruct : public S { +    ZeroStruct() { memset(this, 0, sizeof(*this)); } +}; +     +namespace qpid { +namespace sys { + + +/** + * EventHandler wraps an epoll file descriptor. Acts as private + * interface between EventChannel and subclasses. + * + * Also implements Event interface for events that are not associated + * with a file descriptor and are passed via the message queue. + */  +class EventHandler : public Event, private Monitor +{ +  public: +    EventHandler(int epollSize = 256); +    ~EventHandler(); + +    int getEpollFd() { return epollFd; } +    void epollAdd(int fd, uint32_t epollEvents, Event* event); +    void epollMod(int fd, uint32_t epollEvents, Event* event); +    void epollDel(int fd); + +    void mqPut(Event* event); +    Event* mqGet(); +     +  protected: +    // Should never be called, only complete. +    void prepare(EventHandler&) { assert(0); } +    Event* complete(EventHandler& eh); +     +  private: +    int epollFd; +    std::string mqName; +    int mqFd; +    std::queue<Event*> mqEvents; +}; + +EventHandler::EventHandler(int epollSize) +{ +    epollFd = epoll_create(epollSize); +    if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +    // Create a POSIX message queue for non-fd events. +    // We write one byte and never read it is always ready for read +    // when we add it to epoll. +    //  +    ZeroStruct<struct mq_attr> attr; +    attr.mq_maxmsg = 1; +    attr.mq_msgsize = 1; +    do { +        char tmpnam[L_tmpnam]; +        tmpnam_r(tmpnam); +        mqName = tmpnam + 4; // Skip "tmp/" +        mqFd = mq_open( +            mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); +        if (mqFd < 0) throw QPID_POSIX_ERROR(errno); +    } while (mqFd == EEXIST);  // Name already taken, try again. + +    static char zero = '\0'; +    mq_send(mqFd, &zero, 1, 0); +    epollAdd(mqFd, 0, this); +} + +EventHandler::~EventHandler() { +    mq_close(mqFd); +    mq_unlink(mqName.c_str()); +} + +void EventHandler::mqPut(Event* event) { +    ScopedLock l(*this); +    assert(event != 0); +    mqEvents.push(event); +    epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +} + +Event* EventHandler::mqGet() { +    ScopedLock l(*this); +    if (mqEvents.empty())  +        return 0; +    Event* event = mqEvents.front(); +    mqEvents.pop(); +    if(!mqEvents.empty()) +        epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +    return event; +} + +void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) +{ +    ZeroStruct<struct epoll_event> ee; +    ee.data.ptr = event; +    ee.events = epollEvents; +    if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)  +        throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) +{ +    ZeroStruct<struct epoll_event> ee; +    ee.data.ptr = event; +    ee.events = epollEvents; +    if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)  +        throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollDel(int fd) { +    if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) +        throw QPID_POSIX_ERROR(errno); +} + +Event* EventHandler::complete(EventHandler& eh) +{ +    assert(&eh == this); +    Event* event =  mqGet(); +    return event==0 ? 0 : event->complete(eh); +} +     +// ================================================================ +// EventChannel + +EventChannel::shared_ptr EventChannel::create() { +    return shared_ptr(new EventChannel()); +} + +EventChannel::EventChannel() : handler(new EventHandler()) {} + +EventChannel::~EventChannel() {} + +void EventChannel::postEvent(Event& e)  +{ +    e.prepare(*handler); +} + +Event* EventChannel::getEvent() +{ +    static const int infiniteTimeout = -1;  +    ZeroStruct<struct epoll_event> epollEvent; + +    // Loop until we can complete the event. Some events may re-post +    // themselves and return 0 from complete, e.g. partial reads. //  +    Event* event = 0; +    while (event == 0) { +        int eventCount = epoll_wait(handler->getEpollFd(), +                                    &epollEvent, 1, infiniteTimeout); +        if (eventCount < 0) { +            if (errno != EINTR) { +                // TODO aconway 2006-11-28: Proper handling/logging of errors. +                cerr << BOOST_CURRENT_FUNCTION << " ignoring error " +                     << PosixError::getMessage(errno) << endl; +                assert(0); +            } +        } +        else if (eventCount == 1) { +            event = reinterpret_cast<Event*>(epollEvent.data.ptr); +            assert(event != 0); +            try { +                event = event->complete(*handler); +            } +            catch (const Exception& e) { +                if (event) +                    event->setError(e); +            } +            catch (const std::exception& e) { +                if (event) +                    event->setError(e); +            } +        } +    } +    return event; +} + +Event::~Event() {} +     +void Event::prepare(EventHandler& handler) +{ +    handler.mqPut(this); +} + +bool Event::hasError() const { +    return error; +} + +void Event::throwIfError() throw (Exception) { +    if (hasError()) +        error.throwSelf(); +} + +Event* Event::complete(EventHandler&) +{ +    return this; +} + +void Event::dispatch() +{ +    try { +        if (!callback.empty()) +            callback(); +    } catch (const std::exception&) { +        throw; +    } catch (...) { +        throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); +    } +} + +void Event::setError(const ExceptionHolder& e) { +    error = e; +} + +void ReadEvent::prepare(EventHandler& handler) +{ +    handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +ssize_t ReadEvent::doRead() { +    ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, +                       size - received); +    if (n > 0) received += n; +    return n; +} + +Event* ReadEvent::complete(EventHandler& handler) +{ +    // Read as much as possible without blocking. +    ssize_t n = doRead(); +    while (n > 0 && received < size) doRead(); + +    if (received == size) { +        handler.epollDel(descriptor); +        received = 0;           // Reset for re-use. +        return this; +    } +    else if (n <0 && (errno == EAGAIN)) { +        // Keep polling for more. +        handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); +        return 0; +    } +    else { +        // Unexpected EOF or error. Throw ENODATA for EOF. +        handler.epollDel(descriptor); +        received = 0;           // Reset for re-use. +        throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); +    } +} + +void WriteEvent::prepare(EventHandler& handler) +{ +    handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +} + +Event* WriteEvent::complete(EventHandler& handler) +{ +    ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, +                      size - written); +    if (n < 0) throw QPID_POSIX_ERROR(errno); +    written += n; +    if(written < size) { +        // Keep polling. +        handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); +        return 0; +    } +    written = 0;                // Reset for re-use. +    handler.epollDel(descriptor); +    return this; +} + +void AcceptEvent::prepare(EventHandler& handler) +{ +    handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +Event* AcceptEvent::complete(EventHandler& handler) +{ +    handler.epollDel(descriptor); +    accepted = ::accept(descriptor, 0, 0); +    if (accepted < 0) throw QPID_POSIX_ERROR(errno); +    return this; +} + +}} diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h new file mode 100644 index 0000000000..49c7fce740 --- /dev/null +++ b/cpp/lib/common/sys/posix/EventChannel.h @@ -0,0 +1,176 @@ +#ifndef _sys_EventChannel_h +#define _sys_EventChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <SharedObject.h> +#include <ExceptionHolder.h> +#include <boost/function.hpp> +#include <memory> + +namespace qpid { +namespace sys { + +class Event; +class EventHandler; +class EventChannel; + +/** + * Base class for all Events. + */ +class Event +{ +  public: +    /** Type for callback when event is dispatched */ +    typedef boost::function0<void> Callback; + +    /** +     * Create an event with optional callback. +     * Instances of Event are sent directly through the channel. +     * Derived classes define additional waiting behaviour. +     *@param cb A callback functor that is invoked when dispatch() is called. +     */ +    Event(Callback cb = 0) : callback(cb) {} + +    virtual ~Event(); + +    /** Call the callback provided to the constructor, if any. */ +    void dispatch(); + +    /** True if there was an error processing this event */ +    bool hasError() const; + +    /** If hasError() throw the corresponding exception. */ +    void throwIfError() throw(Exception); + +  protected: +    virtual void prepare(EventHandler&); +    virtual Event* complete(EventHandler&); +    void setError(const ExceptionHolder& e); + +    Callback callback; +    ExceptionHolder error; + +  friend class EventChannel; +  friend class EventHandler; +}; + +template <class BufT> +class IOEvent : public Event { +  public: +    void getDescriptor() const { return descriptor; } +    size_t getSize() const { return size; } +    BufT getBuffer() const { return buffer; } +   +  protected: +    IOEvent(int fd, Callback cb, size_t sz, BufT buf) : +        Event(cb), descriptor(fd), buffer(buf), size(sz) {} + +    int descriptor; +    BufT buffer; +    size_t size; +}; + +/** Asynchronous read event */ +class ReadEvent : public IOEvent<void*> +{ +  public: +    explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : +        IOEvent<void*>(fd, cb, sz, buf), received(0) {} + +  private: +    void prepare(EventHandler&); +    Event* complete(EventHandler&); +    ssize_t doRead(); + +    size_t received; +}; + +/** Asynchronous write event */ +class WriteEvent : public IOEvent<const void*> +{ +  public: +    explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, +                        Callback cb=0) : +        IOEvent<const void*>(fd, cb, sz, buf), written(0) {} + +  protected: +    void prepare(EventHandler&); +    Event* complete(EventHandler&); + +  private: +    ssize_t doWrite(); +    size_t written; +}; + +/** Asynchronous socket accept event */ +class AcceptEvent : public Event +{ +  public: +    /** Accept a connection on fd. */ +    explicit AcceptEvent(int fd=-1, Callback cb=0) : +        Event(cb), descriptor(fd), accepted(0) {} + +    /** Get descriptor for server socket */ +    int getAcceptedDesscriptor() const { return accepted; } + +  private: +    void prepare(EventHandler&); +    Event* complete(EventHandler&); + +    int descriptor; +    int accepted; +}; + + +class QueueSet; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject<EventChannel> +{ +  public: +    static shared_ptr create(); +     +    ~EventChannel(); +     +    /** Post an event to the channel. */ +    void postEvent(Event& event); + +    /** Post an event to the channel. Must not be 0. */ +    void postEvent(Event* event) { postEvent(*event); } +         +    /** +     * Wait for the next complete event. +     *@return Pointer to event. Will never return 0. +     */ +    Event* getEvent(); + +  private: +    EventChannel(); +    boost::shared_ptr<EventHandler> handler; +}; + + +}} + + + +#endif  /*!_sys_EventChannel_h*/ diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp new file mode 100644 index 0000000000..95e699e0b0 --- /dev/null +++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp @@ -0,0 +1,119 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "EventChannelThreads.h" +#include <sys/Runnable.h> +#include <iostream> +using namespace std; +#include <boost/bind.hpp> + +namespace qpid { +namespace sys { + +EventChannelThreads::shared_ptr EventChannelThreads::create( +    EventChannel::shared_ptr ec) +{ +    return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); +} + +EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : +    channel(ec), nWaiting(0), state(RUNNING) +{ +    // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. +    addThread(); +} + +EventChannelThreads::~EventChannelThreads() { +    shutdown(); +    join(); +} + +void EventChannelThreads::shutdown()  +{ +    ScopedLock lock(*this); +    if (state != RUNNING)       // Already shutting down. +        return; +    for (size_t i = 0; i < workers.size(); ++i) { +        channel->postEvent(terminate); +    } +    state = TERMINATE_SENT; +    notify();                // Wake up one join() thread. +} + +void EventChannelThreads::join()  +{ +    { +        ScopedLock lock(*this); +        while (state == RUNNING)    // Wait for shutdown to start. +            wait(); +        if (state == SHUTDOWN)      // Shutdown is complete +            return; +        if (state == JOINING) { +            // Someone else is doing the join. +            while (state != SHUTDOWN) +                wait(); +            return; +        } +        // I'm the  joining thread +        assert(state == TERMINATE_SENT);  +        state = JOINING;  +    } // Drop the lock. + +    for (size_t i = 0; i < workers.size(); ++i) { +        assert(state == JOINING); // Only this thread can change JOINING. +        workers[i].join(); +    } +    state = SHUTDOWN; +    notifyAll();                // Notify other join() threaeds. +} + +void EventChannelThreads::addThread() { +    ScopedLock l(*this); +    workers.push_back(Thread(*this)); +} + +void EventChannelThreads::run() +{ +    // Start life waiting. Decrement on exit. +    AtomicCount::ScopedIncrement inc(nWaiting); +    try { +        while (true) { +            Event* e = channel->getEvent();  +            assert(e != 0); +            if (e == &terminate) { +                return; +            } +            AtomicCount::ScopedDecrement dec(nWaiting); +            // I'm no longer waiting, make sure someone is. +            if (dec == 0) +                addThread(); +            e->dispatch(); +        } +    } +    catch (const std::exception& e) { +        // TODO aconway 2006-11-15: need better logging across the board. +        std::cerr << "EventChannelThreads::run() caught: " << e.what() +                  << std::endl; +    } +    catch (...) { +        std::cerr << "EventChannelThreads::run() caught unknown exception." +                  << std::endl; +    } +} + +}} diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h new file mode 100644 index 0000000000..98403c0869 --- /dev/null +++ b/cpp/lib/common/sys/posix/EventChannelThreads.h @@ -0,0 +1,92 @@ +#ifndef _posix_EventChannelThreads_h +#define _sys_EventChannelThreads_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <vector> + +#include <Exception.h> +#include <sys/Time.h> +#include <sys/Monitor.h> +#include <sys/Thread.h> +#include <sys/AtomicCount.h> +#include "EventChannel.h" + +namespace qpid { +namespace sys { + +/** +   Dynamic thread pool serving an EventChannel. + +   Threads run a loop { e = getEvent(); e->dispatch(); } +   The size of the thread pool is automatically adjusted to optimal size. +*/ +class EventChannelThreads : +        public qpid::SharedObject<EventChannelThreads>, +        public sys::Monitor, private sys::Runnable +{ +  public: +    /** Create the thread pool and start initial threads. */ +    static EventChannelThreads::shared_ptr create( +        EventChannel::shared_ptr channel +    ); + +    ~EventChannelThreads(); + +    /** Post event to the underlying channel */ +    void postEvent(Event& event) { channel->postEvent(event); } + +    /** Post event to the underlying channel Must not be 0. */ +    void postEvent(Event* event) { channel->postEvent(event); } + +    /** +     * Terminate all threads. +     * +     * Returns immediately, use join() to wait till all threads are +     * shut down.  +     */ +    void shutdown(); +     +    /** Wait for all threads to terminate. */ +    void join(); + +  private: +    typedef std::vector<sys::Thread> Threads; +    typedef enum { +        RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN +    } State; + +    EventChannelThreads(EventChannel::shared_ptr underlyingChannel); +    void addThread(); + +    void run(); +    bool keepRunning(); +    void adjustThreads(); + +    EventChannel::shared_ptr channel; +    Threads workers; +    sys::AtomicCount nWaiting; +    State state; +    Event terminate; +}; + + +}} + + +#endif  /*!_sys_EventChannelThreads_h*/ diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp new file mode 100644 index 0000000000..842aa76f36 --- /dev/null +++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Acceptor.h> +#include <Exception.h> + +namespace qpid { +namespace sys { + +namespace { +void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } +} + +class PosixAcceptor : public Acceptor { +  public: +    virtual int16_t getPort() const { fail(); return 0; } +    virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); } +    virtual void shutdown() { fail(); } +}; + +// Define generic Acceptor::create() to return APRAcceptor. +    Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool) +{ +    return Acceptor::shared_ptr(new PosixAcceptor()); +} + +// Must define Acceptor virtual dtor. +Acceptor::~Acceptor() {} + +}} diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp new file mode 100644 index 0000000000..5bd13742f6 --- /dev/null +++ b/cpp/lib/common/sys/posix/Socket.cpp @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/socket.h> +#include <sys/errno.h> +#include <netinet/in.h> +#include <netdb.h> + +#include <boost/format.hpp> + +#include <QpidError.h> +#include <posix/check.h> +#include <sys/Socket.h> + +using namespace qpid::sys; + +Socket Socket::createTcp()  +{ +    int s = ::socket (PF_INET, SOCK_STREAM, 0); +    if (s < 0) throw QPID_POSIX_ERROR(errno); +    return s; +} + +Socket::Socket(int descriptor) : socket(descriptor) {} + +void Socket::setTimeout(Time interval) +{ +    struct timeval tv; +    tv.tv_sec = interval/TIME_SEC; +    tv.tv_usec = (interval%TIME_SEC)/TIME_USEC; +    setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); +    setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); +} + +void Socket::connect(const std::string& host, int port) +{ +    struct sockaddr_in name; +    name.sin_family = AF_INET; +    name.sin_port = htons(port); +    struct hostent* hp = gethostbyname ( host.c_str() ); +    if (hp == 0) throw QPID_POSIX_ERROR(errno); +    memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length); +    if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) +        throw QPID_POSIX_ERROR(errno); +} + +void +Socket::close() +{ +    if (socket == 0) return; +    if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); +    socket = 0; +} + +ssize_t +Socket::send(const void* data, size_t size) +{ +    ssize_t sent = ::send(socket, data, size, 0); +    if (sent < 0) { +        if (errno == ECONNRESET) return SOCKET_EOF; +        if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; +        throw QPID_POSIX_ERROR(errno); +    } +    return sent; +} + +ssize_t +Socket::recv(void* data, size_t size) +{ +    ssize_t received = ::recv(socket, data, size, 0); +    if (received < 0) { +        if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; +        throw QPID_POSIX_ERROR(errno); +    } +    return received; +} + +int Socket::listen(int port, int backlog)  +{ +    struct sockaddr_in name; +    name.sin_family = AF_INET; +    name.sin_port = htons(port); +    name.sin_addr.s_addr = 0; +    if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) +        throw QPID_POSIX_ERROR(errno); +    if (::listen(socket, backlog) < 0) +        throw QPID_POSIX_ERROR(errno); +     +    socklen_t namelen = sizeof(name); +    if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) +        throw QPID_POSIX_ERROR(errno); + +    return ntohs(name.sin_port); +} + + +int Socket::fd()  +{ +    return socket; +} diff --git a/cpp/lib/common/sys/posix/Thread.cpp b/cpp/lib/common/sys/posix/Thread.cpp new file mode 100644 index 0000000000..f524799556 --- /dev/null +++ b/cpp/lib/common/sys/posix/Thread.cpp @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Thread.h> + +void* qpid::sys::Thread::runRunnable(void* p) +{ +    static_cast<Runnable*>(p)->run(); +    return 0; +} diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp new file mode 100644 index 0000000000..408679caa8 --- /dev/null +++ b/cpp/lib/common/sys/posix/check.cpp @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <cerrno> +#include "check.h"  + +namespace qpid { +namespace sys { + +std::string +PosixError::getMessage(int errNo) +{ +    char buf[512]; +    return std::string(strerror_r(errNo, buf, sizeof(buf))); +} + +PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() +    : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) +{ } +     +}} diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h new file mode 100644 index 0000000000..5afbe8f5a8 --- /dev/null +++ b/cpp/lib/common/sys/posix/check.h @@ -0,0 +1,62 @@ +#ifndef _posix_check_h +#define _posix_check_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <cerrno> +#include <string> +#include <QpidError.h> + +namespace qpid { +namespace sys { + +/** + * Exception with message from errno. + */ +class PosixError : public qpid::QpidError +{ +  public: +    static std::string getMessage(int errNo); +     +    PosixError(int errNo, const qpid::SrcLine& location) throw(); +     +    ~PosixError() throw() {} +     +    int getErrNo() { return errNo; } + +    Exception* clone() const throw() { return new PosixError(*this); } +         +    void throwSelf() { throw *this; } + +  private: +    int errNo; +}; + +}} + +/** Create a PosixError for the current file/line and errno. */ +#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) + +/** Throw a posix error if errNo is non-zero */ +#define QPID_POSIX_THROW_IF(ERRNO)              \ +    if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) +#endif  /*!_posix_check_h*/ | 
