diff options
| author | Alan Conway <aconway@apache.org> | 2006-10-31 19:53:55 +0000 | 
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-10-31 19:53:55 +0000 | 
| commit | 9094d2b10ecadd66fa3b22169183e7573cc79629 (patch) | |
| tree | bf3915f72be2a5f09932b800d2fa4309fb3ad64e /cpp/src | |
| parent | 0487ea40bc6568765cdec75a36273eeb26fae854 (diff) | |
| download | qpid-python-9094d2b10ecadd66fa3b22169183e7573cc79629.tar.gz | |
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
58 files changed, 464 insertions, 1480 deletions
| diff --git a/cpp/src/qpid/SharedObject.h b/cpp/src/qpid/SharedObject.h new file mode 100644 index 0000000000..15f333173a --- /dev/null +++ b/cpp/src/qpid/SharedObject.h @@ -0,0 +1,52 @@ +#ifndef _SharedObject_ +#define _SharedObject_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/shared_ptr.hpp> +#include <boost/noncopyable.hpp> + +namespace qpid { +    /** +     * Template to enforce shared object conventions. +     * Shared object classes should inherit : public qpid::SharedObject +     * That ensures Foo: +     * - has typedef boost::shared_ptr<T> SharedPtr +     * - has virtual destructor +     * - is boost::noncopyable (no default copy or assign) +     * - has a protected default constructor. +     * +     * Shared objects should not have public constructors. +     * Make constructors protected and provide public statc create() +     * functions that return a SharedPtr. +     */ +    template <class T> +    class SharedObject : private boost::noncopyable +    { +      public: +        typedef boost::shared_ptr<T> SharedPtr; + +        virtual ~SharedObject() {}; + +      protected: +        SharedObject() {}  +    }; +} + +#endif  /*!_SharedObject_*/ diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h index 9faa4aa4c4..509ac3bec1 100644 --- a/cpp/src/qpid/broker/AutoDelete.h +++ b/cpp/src/qpid/broker/AutoDelete.h @@ -20,17 +20,17 @@  #include <iostream>  #include <queue> -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/Queue.h"  #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/ThreadFactoryImpl.h" +#include "qpid/concurrent/ThreadFactory.h"  namespace qpid {      namespace broker{          class AutoDelete : private virtual qpid::concurrent::Runnable{ -            qpid::concurrent::ThreadFactoryImpl factory; -            qpid::concurrent::MonitorImpl lock;             -            qpid::concurrent::MonitorImpl monitor;             +            qpid::concurrent::ThreadFactory factory; +            qpid::concurrent::Monitor lock;             +            qpid::concurrent::Monitor monitor;                          std::queue<Queue::shared_ptr> queues;              QueueRegistry* const registry;              const u_int32_t period; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index fe859b240b..7b5f9e3e32 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -18,60 +18,30 @@  #include <iostream>  #include <memory>  #include "qpid/broker/Broker.h" -#include "qpid/io/Acceptor.h" -#include "qpid/broker/Configuration.h" -#include "qpid/QpidError.h" -#include "qpid/broker/SessionHandlerFactoryImpl.h" -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/io/LFAcceptor.h"  using namespace qpid::broker;  using namespace qpid::io; -namespace { -    Acceptor* createAcceptor(const Configuration& config){ -        const string type(config.getAcceptor()); -        if("blocking" == type){ -            std::cout << "Using blocking acceptor " << std::endl; -            return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); -        }else if("non-blocking" == type){ -            std::cout << "Using non-blocking acceptor " << std::endl; -            return new LFAcceptor(config.isTrace(),  -                                  config.getConnectionBacklog(),  -                                  config.getWorkerThreads(), -                                  config.getMaxConnections()); -        } -        throw Configuration::ParseException("Unrecognised acceptor: " + type); -    } -} -  Broker::Broker(const Configuration& config) : -    acceptor(createAcceptor(config)), -    port(config.getPort()), -    isBound(false) {} +    acceptor(new Acceptor(config.getPort(), +                          config.getConnectionBacklog(), +                          config.getWorkerThreads())) +{ } + -Broker::shared_ptr Broker::create(int port)  +Broker::SharedPtr Broker::create(int16_t port)   {      Configuration config;      config.setPort(port);      return create(config);  } -Broker::shared_ptr Broker::create(const Configuration& config) { -    return Broker::shared_ptr(new Broker(config)); +Broker::SharedPtr Broker::create(const Configuration& config) { +    return Broker::SharedPtr(new Broker(config));  }     -int16_t Broker::bind() -{ -    if (!isBound) { -        port = acceptor->bind(port); -    } -    return port; -} -  void Broker::run() { -    bind();      acceptor->run(&factory);  } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 8581093910..dd87c47909 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -19,47 +19,35 @@   *   */ -#include "qpid/io/Acceptor.h"  #include "qpid/broker/Configuration.h" -#include "qpid/concurrent/Runnable.h"  #include "qpid/broker/SessionHandlerFactoryImpl.h" -#include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> +#include "qpid/concurrent/Runnable.h" +#include "qpid/io/Acceptor.h" +#include <qpid/SharedObject.h>  namespace qpid {      namespace broker {          /**           * A broker instance.            */ -        class Broker : public qpid::concurrent::Runnable, private boost::noncopyable { -            Broker(const Configuration& config); // Private, use create() -            std::auto_ptr<qpid::io::Acceptor> acceptor; -            SessionHandlerFactoryImpl factory; -            int16_t port; -            bool isBound; -             +        class Broker : public qpid::concurrent::Runnable, +                       public qpid::SharedObject<Broker> +        {            public:              static const int16_t DEFAULT_PORT;              virtual ~Broker(); -            typedef boost::shared_ptr<Broker> shared_ptr;              /**               * Create a broker.               * @param port Port to listen on or 0 to pick a port dynamically.               */ -            static shared_ptr create(int port = DEFAULT_PORT); +            static SharedPtr create(int16_t port = DEFAULT_PORT);              /** -             * Create a broker from a Configuration. +             * Create a broker using a Configuration.               */ -            static shared_ptr create(const Configuration& config); - -            /** -             * Bind to the listening port. -             * @return The port number bound.  -             */ -            virtual int16_t bind(); +            static SharedPtr create(const Configuration& config);              /**               * Return listening port. If called before bind this is @@ -67,7 +55,7 @@ namespace qpid {               * port, which will be different if the configured port is               * 0.               */ -            virtual int16_t getPort() { return port; } +            virtual int16_t getPort() const { return acceptor->getPort(); }              /**               * Run the broker. Implements Runnable::run() so the broker @@ -77,6 +65,11 @@ namespace qpid {              /** Shut down the broker */              virtual void shutdown(); + +          private: +            Broker(const Configuration& config);  +            qpid::io::Acceptor::SharedPtr acceptor; +            SessionHandlerFactoryImpl factory;          };      }  } diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h index f5aa0e45ed..13bd4cd450 100644 --- a/cpp/src/qpid/broker/Channel.h +++ b/cpp/src/qpid/broker/Channel.h @@ -37,7 +37,7 @@  #include "qpid/broker/TxAck.h"  #include "qpid/broker/TxBuffer.h"  #include "qpid/broker/TxPublish.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/framing/OutputHandler.h"  #include "qpid/framing/AMQContentBody.h"  #include "qpid/framing/AMQHeaderBody.h" @@ -77,7 +77,7 @@ namespace qpid {              u_int32_t framesize;              NameGenerator tagGenerator;              std::list<DeliveryRecord> unacked; -            qpid::concurrent::MonitorImpl deliveryLock; +            qpid::concurrent::Monitor deliveryLock;              TxBuffer txBuffer;              AccumulatedAck accumulatedAck;              TransactionalStore* store; diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp index 2dcefd878d..550b283d62 100644 --- a/cpp/src/qpid/broker/Configuration.cpp +++ b/cpp/src/qpid/broker/Configuration.cpp @@ -24,10 +24,9 @@ using namespace std;  Configuration::Configuration() :       trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),      port('p', "port", "Sets the port to listen on (default=5672)", 5672), -    workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5), -    maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500), +    workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5), +    maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500),      connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), -    acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),      help("help", "Prints usage information", false)  {      options.push_back(&trace); @@ -35,7 +34,6 @@ Configuration::Configuration() :      options.push_back(&workerThreads);      options.push_back(&maxConnections);      options.push_back(&connectionBacklog); -    options.push_back(&acceptor);      options.push_back(&help);  } @@ -85,10 +83,6 @@ int Configuration::getConnectionBacklog() const {      return connectionBacklog.getValue();  } -string Configuration::getAcceptor() const { -    return acceptor.getValue(); -} -  Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :       flag(string("-") + _flag), name("--" +_name), desc(_desc) {} diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h index 61ecc89ed9..e1e7c40947 100644 --- a/cpp/src/qpid/broker/Configuration.h +++ b/cpp/src/qpid/broker/Configuration.h @@ -92,7 +92,6 @@ namespace qpid {              IntOption workerThreads;              IntOption maxConnections;              IntOption connectionBacklog; -            StringOption acceptor;              BoolOption help;              typedef std::vector<Option*>::iterator op_iterator; @@ -116,7 +115,6 @@ namespace qpid {              int getWorkerThreads() const;              int getMaxConnections() const;              int getConnectionBacklog() const; -            std::string getAcceptor() const;              void setHelp(bool b) { help.setValue(b); }              void setTrace(bool b) { trace.setValue(b); } @@ -124,7 +122,6 @@ namespace qpid {              void setWorkerThreads(int i) { workerThreads.setValue(i); }              void setMaxConnections(int i) { maxConnections.setValue(i); }              void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } -            void setAcceptor(const std::string& val) { acceptor.setValue(val); }              void usage();          }; diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 5c5f78d90a..2c3143cd3c 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -23,14 +23,14 @@  #include "qpid/broker/Exchange.h"  #include "qpid/framing/FieldTable.h"  #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/Queue.h"  namespace qpid {  namespace broker {      class DirectExchange : public virtual Exchange{          std::map<string, std::vector<Queue::shared_ptr> > bindings; -        qpid::concurrent::MonitorImpl lock; +        qpid::concurrent::Monitor lock;      public:          static const std::string typeName; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index fca5462e72..c574a97e14 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -20,7 +20,7 @@  #include <map>  #include "qpid/broker/Exchange.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  namespace qpid {  namespace broker { @@ -29,7 +29,7 @@ namespace broker {      class ExchangeRegistry{          typedef std::map<string, Exchange::shared_ptr> ExchangeMap;          ExchangeMap exchanges; -        qpid::concurrent::MonitorImpl lock; +        qpid::concurrent::Monitor lock;      public:          std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);          void destroy(const string& name); diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 334f1ccdcc..83fcdb9b34 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -23,7 +23,7 @@  #include "qpid/broker/Exchange.h"  #include "qpid/framing/FieldTable.h"  #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/Queue.h"  namespace qpid { @@ -31,7 +31,7 @@ namespace broker {  class FanOutExchange : public virtual Exchange {      std::vector<Queue::shared_ptr> bindings; -    qpid::concurrent::MonitorImpl lock; +    qpid::concurrent::Monitor lock;    public:      static const std::string typeName; diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 2e2403361e..cf699ac455 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -22,7 +22,7 @@  #include "qpid/broker/Exchange.h"  #include "qpid/framing/FieldTable.h"  #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/Queue.h"  namespace qpid { @@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange {      typedef std::vector<Binding> Bindings;      Bindings bindings; -    qpid::concurrent::MonitorImpl lock; +    qpid::concurrent::Monitor lock;    public:      static const std::string typeName; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 962c74864e..e96cc65b95 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -15,7 +15,7 @@   * limitations under the License.   *   */ -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/Message.h"  #include <iostream> diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 67fb6764be..88dad7aaf9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -17,7 +17,7 @@   */  #include "qpid/broker/Queue.h"  #include "qpid/broker/MessageStore.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include <iostream>  using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 93570f59cc..f954e48c20 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -27,7 +27,7 @@  #include "qpid/broker/ConnectionToken.h"  #include "qpid/broker/Consumer.h"  #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  namespace qpid {      namespace broker { @@ -56,7 +56,7 @@ namespace qpid {              bool queueing;              bool dispatching;              int next; -            mutable qpid::concurrent::MonitorImpl lock; +            mutable qpid::concurrent::Monitor lock;              apr_time_t lastUsed;              Consumer* exclusive; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 973201fe64..949c194bbe 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -16,7 +16,7 @@   *   */  #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/SessionHandlerImpl.h"  #include <sstream>  #include <assert.h> diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 6f80291192..4f9e4b882a 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -19,7 +19,7 @@  #define _QueueRegistry_  #include <map> -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/Queue.h"  namespace qpid { @@ -77,7 +77,7 @@ class QueueRegistry{    private:      typedef std::map<string, Queue::shared_ptr> QueueMap;      QueueMap queues; -    qpid::concurrent::MonitorImpl lock; +    qpid::concurrent::Monitor lock;      int counter;  }; diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 19ea732fbc..cb773b9a56 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -23,7 +23,7 @@  #include "qpid/broker/Exchange.h"  #include "qpid/framing/FieldTable.h"  #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/broker/Queue.h"  namespace qpid { @@ -71,7 +71,7 @@ class TopicPattern : public Tokens  class TopicExchange : public virtual Exchange{      typedef std::map<TopicPattern, Queue::vector> BindingMap;      BindingMap bindings; -    qpid::concurrent::MonitorImpl lock; +    qpid::concurrent::Monitor lock;    public:      static const std::string typeName; diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index 4d994f0510..4579b6126d 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -16,8 +16,8 @@   *   */  #include "qpid/client/Channel.h" -#include "qpid/concurrent/MonitorImpl.h" -#include "qpid/concurrent/ThreadFactoryImpl.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h"  #include "qpid/client/Message.h"  #include "qpid/QpidError.h" @@ -36,9 +36,9 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch) :      prefetch(_prefetch),       transactional(_transactional)  { -    threadFactory = new ThreadFactoryImpl(); -    dispatchMonitor = new MonitorImpl(); -    retrievalMonitor = new MonitorImpl(); +    threadFactory = new ThreadFactory(); +    dispatchMonitor = new Monitor(); +    retrievalMonitor = new Monitor();  }  Channel::~Channel(){ diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index b20df92e9b..acd4488813 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -17,7 +17,7 @@   */  #include "qpid/client/Connection.h"  #include "qpid/client/Channel.h" -#include "qpid/io/ConnectorImpl.h" +#include "qpid/io/Connector.h"  #include "qpid/client/Message.h"  #include "qpid/QpidError.h"  #include <iostream> @@ -30,7 +30,7 @@ using namespace qpid::concurrent;  u_int16_t Connection::channelIdCounter;  Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ -    connector = new ConnectorImpl(debug, _max_frame_size); +    connector = new Connector(debug, _max_frame_size);  }  Connection::~Connection(){ diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp index ec20dd1a10..fcbc76f625 100644 --- a/cpp/src/qpid/client/ResponseHandler.cpp +++ b/cpp/src/qpid/client/ResponseHandler.cpp @@ -16,11 +16,11 @@   *   */  #include "qpid/client/ResponseHandler.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/QpidError.h"  qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ -    monitor = new qpid::concurrent::MonitorImpl(); +    monitor = new qpid::concurrent::Monitor();  }  qpid::client::ResponseHandler::~ResponseHandler(){ diff --git a/cpp/src/qpid/concurrent/APRMonitor.h b/cpp/src/qpid/concurrent/APRMonitor.h deleted file mode 100644 index a396beab50..0000000000 --- a/cpp/src/qpid/concurrent/APRMonitor.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRMonitor_ -#define _APRMonitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - -    class APRMonitor : public virtual Monitor  -    { -	apr_pool_t* pool; -	apr_thread_mutex_t* mutex; -	apr_thread_cond_t* condition; - -    public: -	APRMonitor(); -	virtual ~APRMonitor(); -	virtual void wait(); -	virtual void wait(u_int64_t time); -	virtual void notify(); -	virtual void notifyAll(); -	virtual void acquire(); -	virtual void release(); -    }; -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThread.h b/cpp/src/qpid/concurrent/APRThread.h deleted file mode 100644 index 6328765a06..0000000000 --- a/cpp/src/qpid/concurrent/APRThread.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThread_ -#define _APRThread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace concurrent { - -    class APRThread : public Thread -    { -	const Runnable* runnable; -	apr_pool_t* pool; -	apr_thread_t* runner; - -    public: -	APRThread(apr_pool_t* pool, Runnable* runnable); -	virtual ~APRThread(); -	virtual void start(); -	virtual void join(); -	virtual void interrupt(); -        static unsigned int currentThread(); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.h b/cpp/src/qpid/concurrent/APRThreadFactory.h deleted file mode 100644 index 40e96fc2d1..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadFactory.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadFactory_ -#define _APRThreadFactory_ - -#include "apr-1/apr_thread_proc.h" - -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - -    class APRThreadFactory : public virtual ThreadFactory -    { -	apr_pool_t* pool; -    public: -	APRThreadFactory(); -	virtual ~APRThreadFactory(); -	virtual Thread* create(Runnable* runnable); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadPool.h b/cpp/src/qpid/concurrent/APRThreadPool.h deleted file mode 100644 index cab5bcc9ce..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadPool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadPool_ -#define _APRThreadPool_ - -#include <queue> -#include <vector> -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - -    class APRThreadPool : public virtual ThreadPool -    { -        class Worker : public virtual Runnable{ -            APRThreadPool* pool; -        public: -            inline Worker(APRThreadPool* _pool) : pool(_pool){} -            inline virtual void run(){ -                while(pool->running){ -                    pool->runTask(); -                } -            } -        }; -        const bool deleteFactory; -        const int size; -        ThreadFactory* factory; -        APRMonitor lock;  -        std::vector<Thread*> threads; -        std::queue<Runnable*> tasks; -        Worker* worker; -        volatile bool running; - -        void runTask(); -    public: -        APRThreadPool(int size); -        APRThreadPool(int size, ThreadFactory* factory); -        virtual void start(); -        virtual void stop(); -	virtual void addTask(Runnable* task); -        virtual ~APRThreadPool(); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRMonitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp index cc5eda800f..ae68cf8751 100644 --- a/cpp/src/qpid/concurrent/APRMonitor.cpp +++ b/cpp/src/qpid/concurrent/Monitor.cpp @@ -16,45 +16,45 @@   *   */  #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRMonitor.h" +#include "qpid/concurrent/Monitor.h"  #include <iostream> -qpid::concurrent::APRMonitor::APRMonitor(){ +qpid::concurrent::Monitor::Monitor(){      APRBase::increment();      CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));      CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));      CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));  } -qpid::concurrent::APRMonitor::~APRMonitor(){ +qpid::concurrent::Monitor::~Monitor(){      CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));      CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));      apr_pool_destroy(pool);      APRBase::decrement();  } -void qpid::concurrent::APRMonitor::wait(){ +void qpid::concurrent::Monitor::wait(){      CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));  } -void qpid::concurrent::APRMonitor::wait(u_int64_t time){ +void qpid::concurrent::Monitor::wait(u_int64_t time){      apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);      if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);  } -void qpid::concurrent::APRMonitor::notify(){ +void qpid::concurrent::Monitor::notify(){      CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));  } -void qpid::concurrent::APRMonitor::notifyAll(){ +void qpid::concurrent::Monitor::notifyAll(){      CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));  } -void qpid::concurrent::APRMonitor::acquire(){ +void qpid::concurrent::Monitor::acquire(){      CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));  } -void qpid::concurrent::APRMonitor::release(){ +void qpid::concurrent::Monitor::release(){      CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));  } diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h index 42e88c0a48..a2777cb2f1 100644 --- a/cpp/src/qpid/concurrent/Monitor.h +++ b/cpp/src/qpid/concurrent/Monitor.h @@ -18,42 +18,39 @@  #ifndef _Monitor_  #define _Monitor_ -#include "qpid/framing/amqp_types.h" +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "qpid/concurrent/Monitor.h"  namespace qpid {  namespace concurrent {  class Monitor  { +    apr_pool_t* pool; +    apr_thread_mutex_t* mutex; +    apr_thread_cond_t* condition; +    public: -    virtual ~Monitor(){} -    virtual void wait() = 0; -    virtual void wait(u_int64_t time) = 0; -    virtual void notify() = 0; -    virtual void notifyAll() = 0; -    virtual void acquire() = 0; -    virtual void release() = 0; +    Monitor(); +    virtual ~Monitor(); +    virtual void wait(); +    virtual void wait(u_int64_t time); +    virtual void notify(); +    virtual void notifyAll(); +    virtual void acquire(); +    virtual void release();  }; -/** - * Scoped locker for a monitor. - */  class Locker  {    public: -    Locker(Monitor&  lock_) : lock(lock_) { lock.acquire(); } -    ~Locker() { lock.release(); } - +    Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } +    ~Locker() { monitor.release(); }    private: -    Monitor& lock; - -    // private and unimplemented to prevent copying -    Locker(const Locker&); -    void operator=(const Locker&); +    Monitor& monitor;  }; - -} -} +}}  #endif diff --git a/cpp/src/qpid/concurrent/MonitorImpl.h b/cpp/src/qpid/concurrent/MonitorImpl.h deleted file mode 100644 index 258ad140b3..0000000000 --- a/cpp/src/qpid/concurrent/MonitorImpl.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -  -  -#ifndef _MonitorImpl_ -#define _MonitorImpl_ - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRMonitor.h" -#else /* use POSIX Monitor */ -#include "qpid/concurrent/LMonitor.h"   -#endif - - -namespace qpid { -namespace concurrent { - -#ifdef _USE_APR_IO_ -    class MonitorImpl : public virtual APRMonitor  -    { - -    public: -	MonitorImpl() : APRMonitor(){}; -	virtual ~MonitorImpl(){}; - -    }; -#else -    class MonitorImpl : public virtual LMonitor  -    { - -    public: -	MonitorImpl() : LMonitor(){}; -	virtual ~MonitorImpl(){}; - -    };    -#endif -     -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThread.cpp b/cpp/src/qpid/concurrent/Thread.cpp index d4d073cac6..9bbc2f8131 100644 --- a/cpp/src/qpid/concurrent/APRThread.cpp +++ b/cpp/src/qpid/concurrent/Thread.cpp @@ -16,7 +16,7 @@   *   */  #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThread.h" +#include "qpid/concurrent/Thread.h"  #include "apr-1/apr_portable.h"  using namespace qpid::concurrent; @@ -27,24 +27,24 @@ void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){      return NULL;  }  -APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} +Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} -APRThread::~APRThread(){ +Thread::~Thread(){  } -void APRThread::start(){ +void Thread::start(){      CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));  } -void APRThread::join(){ +void Thread::join(){      apr_status_t status;      if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));  } -void APRThread::interrupt(){ +void Thread::interrupt(){      if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));  } -unsigned int qpid::concurrent::APRThread::currentThread(){ +unsigned int qpid::concurrent::Thread::currentThread(){      return apr_os_thread_current();  } diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h index 6bd2a379ce..d18bc153bf 100644 --- a/cpp/src/qpid/concurrent/Thread.h +++ b/cpp/src/qpid/concurrent/Thread.h @@ -18,16 +18,27 @@  #ifndef _Thread_  #define _Thread_ +#include "apr-1/apr_thread_proc.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/concurrent/Thread.h" +  namespace qpid {  namespace concurrent {      class Thread      { +	const Runnable* runnable; +	apr_pool_t* pool; +	apr_thread_t* runner; +      public: -        virtual ~Thread(){} -	virtual void start() = 0; -	virtual void join() = 0; -	virtual void interrupt() = 0; +	Thread(apr_pool_t* pool, Runnable* runnable); +	virtual ~Thread(); +	virtual void start(); +	virtual void join(); +	virtual void interrupt(); +        static unsigned int currentThread();      };  } diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp index 1c99a3da33..b20f9f2b04 100644 --- a/cpp/src/qpid/concurrent/APRThreadFactory.cpp +++ b/cpp/src/qpid/concurrent/ThreadFactory.cpp @@ -16,20 +16,20 @@   *   */  #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/concurrent/ThreadFactory.h"  using namespace qpid::concurrent; -APRThreadFactory::APRThreadFactory(){ +ThreadFactory::ThreadFactory(){      APRBase::increment();      CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));  } -APRThreadFactory::~APRThreadFactory(){ +ThreadFactory::~ThreadFactory(){      apr_pool_destroy(pool);      APRBase::decrement();  } -Thread* APRThreadFactory::create(Runnable* runnable){ -    return new APRThread(pool, runnable); +Thread* ThreadFactory::create(Runnable* runnable){ +    return new Thread(pool, runnable);  } diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h index 60c8ad2556..572419cae6 100644 --- a/cpp/src/qpid/concurrent/ThreadFactory.h +++ b/cpp/src/qpid/concurrent/ThreadFactory.h @@ -18,7 +18,11 @@  #ifndef _ThreadFactory_  #define _ThreadFactory_ +#include "apr-1/apr_thread_proc.h" + +#include "qpid/concurrent/Thread.h"  #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h"  #include "qpid/concurrent/Runnable.h"  namespace qpid { @@ -26,9 +30,11 @@ namespace concurrent {      class ThreadFactory      { +	apr_pool_t* pool;      public: -        virtual ~ThreadFactory(){} -	virtual Thread* create(Runnable* runnable) = 0; +	ThreadFactory(); +	virtual ~ThreadFactory(); +	virtual Thread* create(Runnable* runnable);      };  } diff --git a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h b/cpp/src/qpid/concurrent/ThreadFactoryImpl.h deleted file mode 100644 index 352b77ac21..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadFactoryImpl_ -#define _ThreadFactoryImpl_ - - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRThreadFactory.h" -#else -#include "qpid/concurrent/LThreadFactory.h" -#endif - - -namespace qpid { -namespace concurrent { - - -#ifdef _USE_APR_IO_ -    class ThreadFactoryImpl : public virtual APRThreadFactory -    { -    public: -	ThreadFactoryImpl(): APRThreadFactory() {}; -	virtual ~ThreadFactoryImpl() {}; -    }; -#else -    class ThreadFactoryImpl : public virtual LThreadFactory -    { -    public: -	ThreadFactoryImpl(): LThreadFactory() {}; -	virtual ~ThreadFactoryImpl() {}; -    }; -#endif -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp index 3222c71b0c..5da19745a7 100644 --- a/cpp/src/qpid/concurrent/APRThreadPool.cpp +++ b/cpp/src/qpid/concurrent/ThreadPool.cpp @@ -15,33 +15,33 @@   * limitations under the License.   *   */ -#include "qpid/concurrent/APRThreadFactory.h" -#include "qpid/concurrent/APRThreadPool.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h"  #include "qpid/QpidError.h"  #include <iostream>  using namespace qpid::concurrent; -APRThreadPool::APRThreadPool(int _size) : deleteFactory(true), size(_size), factory(new APRThreadFactory()), running(false){ +ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){      worker = new Worker(this);  } -APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) :     deleteFactory(false), size(_size), factory(_factory), running(false){ +ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) :     deleteFactory(false), size(_size), factory(_factory), running(false){      worker = new Worker(this);  } -APRThreadPool::~APRThreadPool(){ +ThreadPool::~ThreadPool(){      if(deleteFactory) delete factory;  } -void APRThreadPool::addTask(Runnable* task){ +void ThreadPool::addTask(Runnable* task){      lock.acquire();      tasks.push(task);      lock.notifyAll();      lock.release();  } -void APRThreadPool::runTask(){ +void ThreadPool::runTask(){      lock.acquire();      while(tasks.empty()){          lock.wait(); @@ -56,7 +56,7 @@ void APRThreadPool::runTask(){      }  } -void APRThreadPool::start(){ +void ThreadPool::start(){      if(!running){          running = true;          for(int i = 0; i < size; i++){ @@ -67,7 +67,7 @@ void APRThreadPool::start(){      }  } -void APRThreadPool::stop(){ +void ThreadPool::stop(){      if(!running){          running = false;          lock.acquire(); diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h index 925faa76de..11f0cc364f 100644 --- a/cpp/src/qpid/concurrent/ThreadPool.h +++ b/cpp/src/qpid/concurrent/ThreadPool.h @@ -18,7 +18,12 @@  #ifndef _ThreadPool_  #define _ThreadPool_ +#include <queue> +#include <vector> +#include "qpid/concurrent/Monitor.h"  #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h"  #include "qpid/concurrent/Runnable.h"  namespace qpid { @@ -26,11 +31,33 @@ namespace concurrent {      class ThreadPool      { +        class Worker : public virtual Runnable{ +            ThreadPool* pool; +        public: +            inline Worker(ThreadPool* _pool) : pool(_pool){} +            inline virtual void run(){ +                while(pool->running){ +                    pool->runTask(); +                } +            } +        }; +        const bool deleteFactory; +        const int size; +        ThreadFactory* factory; +        Monitor lock;  +        std::vector<Thread*> threads; +        std::queue<Runnable*> tasks; +        Worker* worker; +        volatile bool running; + +        void runTask();      public: -        virtual void start() = 0; -        virtual void stop() = 0; -	virtual void addTask(Runnable* runnable) = 0; -        virtual ~ThreadPool(){} +        ThreadPool(int size); +        ThreadPool(int size, ThreadFactory* factory); +        virtual void start(); +        virtual void stop(); +	virtual void addTask(Runnable* task); +        virtual ~ThreadPool();      };  } diff --git a/cpp/src/qpid/framing/InputHandler.cpp b/cpp/src/qpid/framing/InputHandler.cpp deleted file mode 100644 index accf68421a..0000000000 --- a/cpp/src/qpid/framing/InputHandler.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/InputHandler.h" - -qpid::framing::InputHandler::~InputHandler() {} diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h index e2ad545993..8f56d176b8 100644 --- a/cpp/src/qpid/framing/InputHandler.h +++ b/cpp/src/qpid/framing/InputHandler.h @@ -1,3 +1,5 @@ +#ifndef _InputHandler_ +#define _InputHandler_  /*   *   * Copyright (c) 2006 The Apache Software Foundation @@ -15,24 +17,19 @@   * limitations under the License.   *   */ -#include <string> - -#ifndef _InputHandler_ -#define _InputHandler_ +#include <qpid/SharedObject.h>  #include "qpid/framing/AMQFrame.h"  namespace qpid {  namespace framing { -    class InputHandler{ -    public: -        virtual ~InputHandler(); -	virtual void received(AMQFrame* frame) = 0; -    }; +class InputHandler : public qpid::SharedObject<InputHandler> { +  public: +    virtual void received(AMQFrame* frame) = 0; +}; -} -} +}}  #endif diff --git a/cpp/src/qpid/framing/OutputHandler.cpp b/cpp/src/qpid/framing/OutputHandler.cpp deleted file mode 100644 index 22de39b82a..0000000000 --- a/cpp/src/qpid/framing/OutputHandler.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/OutputHandler.h" - -qpid::framing::OutputHandler::~OutputHandler() {} diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h index ed38a321e5..16fb7e8afb 100644 --- a/cpp/src/qpid/framing/OutputHandler.h +++ b/cpp/src/qpid/framing/OutputHandler.h @@ -1,3 +1,6 @@ +#ifndef _OutputHandler_ +#define _OutputHandler_ +  /*   *   * Copyright (c) 2006 The Apache Software Foundation @@ -15,24 +18,18 @@   * limitations under the License.   *   */ -#include <string> - -#ifndef _OutputHandler_ -#define _OutputHandler_ - +#include <qpid/SharedObject.h>  #include "qpid/framing/AMQFrame.h"  namespace qpid {  namespace framing { -    class OutputHandler{ -    public: -        virtual ~OutputHandler(); -	virtual void send(AMQFrame* frame) = 0; -    }; +class OutputHandler : public qpid::SharedObject<OutputHandler> { +  public: +    virtual void send(AMQFrame* frame) = 0; +}; -} -} +}}  #endif diff --git a/cpp/src/qpid/io/APRConnector.h b/cpp/src/qpid/io/APRConnector.h deleted file mode 100644 index c835f30056..0000000000 --- a/cpp/src/qpid/io/APRConnector.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRConnector_ -#define _APRConnector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" -#include "qpid/concurrent/APRMonitor.h" - -namespace qpid { -namespace io { - -    class APRConnector : public virtual qpid::framing::OutputHandler,  -	public virtual Connector, -	private virtual qpid::concurrent::Runnable -    { -        const bool debug; -	const int receive_buffer_size; -	const int send_buffer_size; - -	bool closed; - -        apr_time_t lastIn; -        apr_time_t lastOut; -        apr_interval_time_t timeout; -        u_int32_t idleIn; -        u_int32_t idleOut; - -        TimeoutHandler* timeoutHandler; -        ShutdownHandler* shutdownHandler; -	qpid::framing::InputHandler* input; -	qpid::framing::InitiationHandler* initialiser; -	qpid::framing::OutputHandler* output; -	 -	qpid::framing::Buffer inbuf; -	qpid::framing::Buffer outbuf; - -        qpid::concurrent::APRMonitor* writeLock; -	qpid::concurrent::ThreadFactory* threadFactory; -	qpid::concurrent::Thread* receiver; - -	apr_pool_t* pool; -	apr_socket_t* socket; - -        void checkIdle(apr_status_t status); -	void writeBlock(qpid::framing::AMQDataBlock* data); -	void writeToSocket(char* data, size_t available); -        void setSocketTimeout(); - -	void run(); - -    public: -	APRConnector(bool debug = false, u_int32_t buffer_size = 1024); -	virtual ~APRConnector(); -	virtual void connect(const std::string& host, int port); -	virtual void init(qpid::framing::ProtocolInitiation* header); -	virtual void close(); -	virtual void setInputHandler(qpid::framing::InputHandler* handler); -	virtual void setTimeoutHandler(TimeoutHandler* handler); -	virtual void setShutdownHandler(ShutdownHandler* handler); -	virtual qpid::framing::OutputHandler* getOutputHandler(); -	virtual void send(qpid::framing::AMQFrame* frame); -        virtual void setReadTimeout(u_int16_t timeout); -        virtual void setWriteTimeout(u_int16_t timeout); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/LMonitor.h b/cpp/src/qpid/io/APRPool.cpp index 70e99b9807..edd434f16c 100644 --- a/cpp/src/qpid/concurrent/LMonitor.h +++ b/cpp/src/qpid/io/APRPool.cpp @@ -15,30 +15,25 @@   * limitations under the License.   *   */ -#ifndef _LMonitor_ -#define _LMonitor_ -/* Native Linux Monitor - Based of Kernel patch 19/20 */ +#include "APRPool.h" +#include "qpid/concurrent/APRBase.h" +#include <boost/pool/singleton_pool.hpp> -#include "qpid/concurrent/Monitor.h" +using namespace qpid::io; +using namespace qpid::concurrent; -namespace qpid { -namespace concurrent { - -    class LMonitor : public virtual Monitor  -    { - -    public: -	LMonitor(); -	virtual ~LMonitor(); -	virtual void wait(); -	virtual void notify(); -	virtual void notifyAll(); -	virtual void acquire(); -	virtual void release(); -    }; +APRPool::APRPool(){ +    APRBase::increment(); +    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));  } + +APRPool::~APRPool(){ +    apr_pool_destroy(pool); +    APRBase::decrement();  } +apr_pool_t* APRPool::get() { +    return boost::details::pool::singleton_default<APRPool>::instance().pool; +} -#endif diff --git a/cpp/src/qpid/concurrent/LThreadFactory.h b/cpp/src/qpid/io/APRPool.h index 4a573d1bd1..063eedf1ee 100644 --- a/cpp/src/qpid/concurrent/LThreadFactory.h +++ b/cpp/src/qpid/io/APRPool.h @@ -1,3 +1,6 @@ +#ifndef _APRPool_ +#define _APRPool_ +  /*   *   * Copyright (c) 2006 The Apache Software Foundation @@ -15,23 +18,30 @@   * limitations under the License.   *   */ -#ifndef _LAPRThreadFactory_ -#define _LAPRThreadFactory_ - +#include <boost/noncopyable.hpp> +#include <apr-1/apr_pools.h>  namespace qpid { -namespace concurrent { +namespace io { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { +  public: +    APRPool(); +    ~APRPool(); + +    /** Get singleton instance */ +    static apr_pool_t* get(); + +  private: +    apr_pool_t* pool; +}; + +}} + -    class LThreadFactory  -    { -    public: -	LThreadFactory(); -	virtual ~LThreadFactory(); -	virtual Thread* create(Runnable* runnable); -    }; -} -} -#endif +#endif  /*!_APRPool_*/ diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp index 6b76bd4da2..f95d9448cf 100644 --- a/cpp/src/qpid/io/Acceptor.cpp +++ b/cpp/src/qpid/io/Acceptor.cpp @@ -15,7 +15,64 @@   * limitations under the License.   *   */ -  #include "qpid/io/Acceptor.h" +#include "qpid/concurrent/APRBase.h" +#include "APRPool.h" + +using namespace qpid::concurrent; +using namespace qpid::io; + +Acceptor::Acceptor(int16_t port_, int backlog, int threads) : +    port(port_), +    processor(APRPool::get(), threads, 1000, 5000000) +{ +    apr_sockaddr_t* address; +    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); +    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); +    CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); +    CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); +    CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t Acceptor::getPort() const { +    apr_sockaddr_t* address; +    CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); +    return address->port; +} + +void Acceptor::run(SessionHandlerFactory* factory) { +    running = true; +    processor.start(); +    std::cout << "Listening on port " << getPort() << "..." << std::endl; +    while(running){ +        apr_socket_t* client; +        apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); +        if(status == APR_SUCCESS){ +            //make this socket non-blocking: +            CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); +            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); +            LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); +            session->init(factory->create(session)); +        }else{ +            running = false; +            if(status != APR_EINTR){ +                std::cout << "ERROR: " << get_desc(status) << std::endl; +            } +        } +    } +    shutdown(); +} + +void Acceptor::shutdown() { +    // TODO aconway 2006-10-12: Cleanup, this is not thread safe. +    if (running) { +        running = false; +        processor.stop(); +        CHECK_APR_SUCCESS(apr_socket_close(socket)); +    } +} + -qpid::io::Acceptor::~Acceptor() {} diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h index a7f7ad66f0..bc189f7f6e 100644 --- a/cpp/src/qpid/io/Acceptor.h +++ b/cpp/src/qpid/io/Acceptor.h @@ -15,36 +15,43 @@   * limitations under the License.   *   */ -#ifndef _Acceptor_ -#define _Acceptor_ - +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/io/Acceptor.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" +#include "qpid/io/LFProcessor.h" +#include "qpid/io/LFSessionContext.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/io/SessionContext.h"  #include "qpid/io/SessionHandlerFactory.h" +#include "qpid/concurrent/Thread.h" +#include <qpid/SharedObject.h>  namespace qpid {  namespace io { -    class Acceptor -    { -    public: -        /** -         * Bind to port. -         * @param port Port to bind to, 0 to bind to dynamically chosen port. -         * @return The local bound port. -         */ -        virtual int16_t bind(int16_t port) = 0; - -        /** -         * Run the acceptor. -         */ -        virtual void run(SessionHandlerFactory* factory) = 0; - -        /** -         * Shut down the acceptor. -         */ -        virtual void shutdown() = 0; -         -	virtual ~Acceptor(); -    }; +/** APR Acceptor. */ +class Acceptor : public qpid::SharedObject<Acceptor> +{ +  public: +    Acceptor(int16_t port, int backlog, int threads); +    virtual int16_t getPort() const; +    virtual void run(SessionHandlerFactory* factory); +    virtual void shutdown(); + +  private: +    int16_t port; +    LFProcessor processor; +    apr_socket_t* socket; +    volatile bool running; +};  }  } diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp b/cpp/src/qpid/io/BlockingAPRAcceptor.cpp deleted file mode 100644 index 0e1fc535a2..0000000000 --- a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThreadFactory.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - -BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : -    debug(_debug), -    threadFactory(new APRThreadFactory()), -    connectionBacklog(c) -{ -    APRBase::increment(); -    CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); -} - -int16_t BlockingAPRAcceptor::bind(int16_t _port){ -    apr_sockaddr_t* address; -    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool)); -    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); -    CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); -    CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); -    return getPort(); -} - -int16_t BlockingAPRAcceptor::getPort() const { -    apr_sockaddr_t* address; -    CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); -    return address->port; -} - -void BlockingAPRAcceptor::run(SessionHandlerFactory* factory)  -{ -    running = true; -    std::cout << "Listening on port " << getPort() << "..." << std::endl; -    while(running){ -        apr_socket_t* client; -        apr_status_t status = apr_socket_accept(&client, socket, apr_pool); -        if(status == APR_SUCCESS){ -            //configure socket: -            CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/)); -            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); -            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); -            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); -             -            BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug); -            session->init(factory->create(session)); -            sessions.push_back(session); -        }else{ -            running = false; -            if(status != APR_EINTR){ -                std::cout << "ERROR: " << get_desc(status) << std::endl; -            } -        } -    } -    shutdown(); -} - -void BlockingAPRAcceptor::shutdown()  -{ -    // TODO aconway 2006-10-12: Not thread safe. -    if (running)  -    { -        running = false; -        apr_socket_close(socket); // Don't check, exception safety. -        for(iterator i = sessions.begin(); i < sessions.end(); i++){ -            (*i)->shutdown(); -        } -    } -} - -BlockingAPRAcceptor::~BlockingAPRAcceptor(){ -    delete threadFactory; -    apr_pool_destroy(apr_pool); -    APRBase::decrement(); -} - - -void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ -    sessions.erase(find(sessions.begin(), sessions.end(), session)); -} - diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.h b/cpp/src/qpid/io/BlockingAPRAcceptor.h deleted file mode 100644 index a3042605aa..0000000000 --- a/cpp/src/qpid/io/BlockingAPRAcceptor.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _BlockingAPRAcceptor_ -#define _BlockingAPRAcceptor_ - -#include <vector> -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/io/BlockingAPRSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" - -namespace qpid { -namespace io { - -    class BlockingAPRAcceptor : public virtual Acceptor -    { -        typedef std::vector<BlockingAPRSessionContext*>::iterator iterator; - -        const bool debug; -        apr_pool_t* apr_pool; -        qpid::concurrent::ThreadFactory* threadFactory; -        std::vector<BlockingAPRSessionContext*> sessions; -	apr_socket_t* socket; -        const int connectionBacklog; -        volatile bool running; -         -    public: -	BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); -        virtual int16_t bind(int16_t port); -        virtual int16_t getPort() const; -        virtual void run(SessionHandlerFactory* factory); -        virtual void shutdown(); -	virtual ~BlockingAPRAcceptor(); -        void closed(BlockingAPRSessionContext* session); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp deleted file mode 100644 index 88e6b6b0fc..0000000000 --- a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp +++ /dev/null @@ -1,177 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <assert.h> -#include <iostream> -#include "qpid/io/BlockingAPRSessionContext.h" -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - - -BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket,  -                                                     ThreadFactory* factory,  -                                                     BlockingAPRAcceptor* _acceptor, -                                                     bool _debug)  -    : socket(_socket),  -      debug(_debug), -      handler(0), -      acceptor(_acceptor), -      inbuf(65536), -      outbuf(65536), -      closed(false){ - -    reader = new Reader(this); -    writer = new Writer(this); - -    rThread = factory->create(reader); -    wThread = factory->create(writer); -}             - -BlockingAPRSessionContext::~BlockingAPRSessionContext(){ -    delete reader; -    delete writer; - -    delete rThread; -    delete wThread; - -    delete handler; -} - -void BlockingAPRSessionContext::read(){ -    try{ -        bool initiated(false); -	while(!closed){ -	    apr_size_t bytes(inbuf.available()); -            if(bytes < 1){ -                THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); -            } -	    apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); -            if(APR_STATUS_IS_TIMEUP(s)){ -                //timed out, check closed on loop -            }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ -                closed = true; -            }else{ -		inbuf.move(bytes); -		inbuf.flip(); -		 -                if(!initiated){ -                    ProtocolInitiation* protocolInit = new ProtocolInitiation(); -                    if(protocolInit->decode(inbuf)){ -                        handler->initiated(protocolInit); -                        if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl;  -                        initiated = true; -                    } -                }else{ -                    AMQFrame frame; -                    while(frame.decode(inbuf)){ -                        if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl;  -                        handler->received(&frame); -                    } -                } -                //need to compact buffer to preserve any 'extra' data -                inbuf.compact(); -	    } -	} - -        //close socket  -    }catch(qpid::QpidError error){ -	std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; -    } -} - -void BlockingAPRSessionContext::write(){ -    while(!closed){ -        //get next frame -        outlock.acquire(); -        while(outframes.empty() && !closed){ -            outlock.wait(); -        } -        if(!closed){ -            AMQFrame* frame = outframes.front();                 -            outframes.pop(); -            outlock.release(); -             -            //encode -            frame->encode(outbuf); -            if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl;  -            delete frame; -            outbuf.flip(); -             -            //write from outbuf to socket -            char* data = outbuf.start(); -            const int available = outbuf.available(); -            int written = 0; -            apr_size_t bytes = available; -            while(available > written){ -                apr_socket_send(socket, data + written, &bytes); -                written += bytes; -                bytes = available - written; -            } -            outbuf.clear(); -        }else{ -            outlock.release(); -        } -    } -} - -void BlockingAPRSessionContext::send(AMQFrame* frame){ -    if(!closed){ -        outlock.acquire(); -        bool was_empty(outframes.empty()); -        outframes.push(frame); -        if(was_empty){ -            outlock.notify(); -        } -        outlock.release(); -    }else{ -        std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl;  -    } -} - -void BlockingAPRSessionContext::init(SessionHandler* _handler){ -    handler = _handler; -    rThread->start(); -    wThread->start(); -} - -void BlockingAPRSessionContext::close(){ -    closed = true; -    wThread->join(); -    CHECK_APR_SUCCESS(apr_socket_close(socket)); -    if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl;  -    handler->closed(); -    acceptor->closed(this); -    delete this; -} - -void BlockingAPRSessionContext::shutdown(){ -    closed = true; -    outlock.acquire(); -    outlock.notify(); -    outlock.release(); - -    wThread->join(); -    CHECK_APR_SUCCESS(apr_socket_close(socket)); -    rThread->join(); -    handler->closed(); -    delete this; -} diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.h b/cpp/src/qpid/io/BlockingAPRSessionContext.h deleted file mode 100644 index c06142ace5..0000000000 --- a/cpp/src/qpid/io/BlockingAPRSessionContext.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _BlockingAPRSessionContext_ -#define _BlockingAPRSessionContext_ - -#include <queue> -#include <vector> - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/framing/Buffer.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" - -namespace qpid { -namespace io { - -    class BlockingAPRAcceptor; - -    class BlockingAPRSessionContext : public virtual SessionContext -    { -        class Reader : public virtual qpid::concurrent::Runnable{ -            BlockingAPRSessionContext* parent; -        public: -            inline Reader(BlockingAPRSessionContext* p) : parent(p){} -            inline virtual void run(){ parent->read(); } -            inline virtual ~Reader(){} -        }; - -        class Writer : public virtual qpid::concurrent::Runnable{ -            BlockingAPRSessionContext* parent; -        public: -            inline Writer(BlockingAPRSessionContext* p) : parent(p){} -            inline virtual void run(){ parent->write(); } -            inline virtual ~Writer(){} -        };         - -        apr_socket_t* socket; -        const bool debug; -        SessionHandler* handler; -        BlockingAPRAcceptor* acceptor; -        std::queue<qpid::framing::AMQFrame*> outframes; -        qpid::framing::Buffer inbuf; -        qpid::framing::Buffer outbuf; -        qpid::concurrent::APRMonitor outlock; -        Reader* reader; -        Writer* writer; -        qpid::concurrent::Thread* rThread; -        qpid::concurrent::Thread* wThread; - -        volatile bool closed; - -        void read(); -        void write();     -    public: -        BlockingAPRSessionContext(apr_socket_t* socket,  -                                  qpid::concurrent::ThreadFactory* factory,  -                                  BlockingAPRAcceptor* acceptor,  -                                  bool debug = false); -        ~BlockingAPRSessionContext(); -        virtual void send(qpid::framing::AMQFrame* frame); -        virtual void close(); -        void shutdown(); -        void init(SessionHandler* handler); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/APRConnector.cpp b/cpp/src/qpid/io/Connector.cpp index 91cf01c842..ca487deb86 100644 --- a/cpp/src/qpid/io/APRConnector.cpp +++ b/cpp/src/qpid/io/Connector.cpp @@ -17,8 +17,8 @@   */  #include <iostream>  #include "qpid/concurrent/APRBase.h" -#include "qpid/io/APRConnector.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/ThreadFactory.h"  #include "qpid/QpidError.h"  using namespace qpid::io; @@ -26,7 +26,7 @@ using namespace qpid::concurrent;  using namespace qpid::framing;  using qpid::QpidError; -APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : +Connector::Connector(bool _debug, u_int32_t buffer_size) :      debug(_debug),       receive_buffer_size(buffer_size),      send_buffer_size(buffer_size), @@ -44,11 +44,11 @@ APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :      CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));      CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); -    threadFactory = new APRThreadFactory(); -    writeLock = new APRMonitor(); +    threadFactory = new ThreadFactory(); +    writeLock = new Monitor();  } -APRConnector::~APRConnector(){ +Connector::~Connector(){      delete receiver;      delete writeLock;      delete threadFactory; @@ -57,7 +57,7 @@ APRConnector::~APRConnector(){      APRBase::decrement();  } -void APRConnector::connect(const std::string& host, int port){ +void Connector::connect(const std::string& host, int port){      apr_sockaddr_t* address;      CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));      CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); @@ -67,36 +67,36 @@ void APRConnector::connect(const std::string& host, int port){      receiver->start();  } -void APRConnector::init(ProtocolInitiation* header){ +void Connector::init(ProtocolInitiation* header){      writeBlock(header);      delete header;  } -void APRConnector::close(){ +void Connector::close(){      closed = true;      CHECK_APR_SUCCESS(apr_socket_close(socket));      receiver->join();  } -void APRConnector::setInputHandler(InputHandler* handler){ +void Connector::setInputHandler(InputHandler* handler){      input = handler;  } -void APRConnector::setShutdownHandler(ShutdownHandler* handler){ +void Connector::setShutdownHandler(ShutdownHandler* handler){      shutdownHandler = handler;  } -OutputHandler* APRConnector::getOutputHandler(){  +OutputHandler* Connector::getOutputHandler(){       return this;   } -void APRConnector::send(AMQFrame* frame){ +void Connector::send(AMQFrame* frame){      writeBlock(frame);          if(debug) std::cout << "SENT: " << *frame << std::endl;       delete frame;  } -void APRConnector::writeBlock(AMQDataBlock* data){ +void Connector::writeBlock(AMQDataBlock* data){      writeLock->acquire();      data->encode(outbuf); @@ -107,7 +107,7 @@ void APRConnector::writeBlock(AMQDataBlock* data){      writeLock->release();  } -void APRConnector::writeToSocket(char* data, size_t available){ +void Connector::writeToSocket(char* data, size_t available){      apr_size_t bytes(available);      apr_size_t written(0);      while(written < available && !closed){ @@ -124,7 +124,7 @@ void APRConnector::writeToSocket(char* data, size_t available){      }  } -void APRConnector::checkIdle(apr_status_t status){ +void Connector::checkIdle(apr_status_t status){      if(timeoutHandler){          apr_time_t now = apr_time_as_msec(apr_time_now());          if(APR_STATUS_IS_TIMEUP(status)){ @@ -144,7 +144,7 @@ void APRConnector::checkIdle(apr_status_t status){      }  } -void APRConnector::setReadTimeout(u_int16_t t){ +void Connector::setReadTimeout(u_int16_t t){      idleIn = t * 1000;//t is in secs      if(idleIn && (!timeout || idleIn < timeout)){          timeout = idleIn; @@ -153,7 +153,7 @@ void APRConnector::setReadTimeout(u_int16_t t){  } -void APRConnector::setWriteTimeout(u_int16_t t){ +void Connector::setWriteTimeout(u_int16_t t){      idleOut = t * 1000;//t is in secs      if(idleOut && (!timeout || idleOut < timeout)){          timeout = idleOut; @@ -161,7 +161,7 @@ void APRConnector::setWriteTimeout(u_int16_t t){      }  } -void APRConnector::setSocketTimeout(){ +void Connector::setSocketTimeout(){      //interval is in microseconds, timeout in milliseconds      //want the interval to be a bit shorter than the timeout, hence multiply      //by 800 rather than 1000. @@ -169,11 +169,11 @@ void APRConnector::setSocketTimeout(){      apr_socket_timeout_set(socket, interval);  } -void APRConnector::setTimeoutHandler(TimeoutHandler* handler){ +void Connector::setTimeoutHandler(TimeoutHandler* handler){      timeoutHandler = handler;  } -void APRConnector::run(){ +void Connector::run(){      try{  	while(!closed){  	    apr_size_t bytes(inbuf.available()); diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h index d0a2f470a8..7c52f7e87b 100644 --- a/cpp/src/qpid/io/Connector.h +++ b/cpp/src/qpid/io/Connector.h @@ -18,35 +18,74 @@  #ifndef _Connector_  #define _Connector_ +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_time.h" +  #include "qpid/framing/InputHandler.h"  #include "qpid/framing/OutputHandler.h"  #include "qpid/framing/InitiationHandler.h"  #include "qpid/framing/ProtocolInitiation.h"  #include "qpid/io/ShutdownHandler.h"  #include "qpid/io/TimeoutHandler.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/Monitor.h"  namespace qpid {  namespace io { -    class Connector +    class Connector : public virtual qpid::framing::OutputHandler,  +	private virtual qpid::concurrent::Runnable      { +        const bool debug; +	const int receive_buffer_size; +	const int send_buffer_size; + +	bool closed; + +        apr_time_t lastIn; +        apr_time_t lastOut; +        apr_interval_time_t timeout; +        u_int32_t idleIn; +        u_int32_t idleOut; + +        TimeoutHandler* timeoutHandler; +        ShutdownHandler* shutdownHandler; +	qpid::framing::InputHandler* input; +	qpid::framing::InitiationHandler* initialiser; +	qpid::framing::OutputHandler* output; +	 +	qpid::framing::Buffer inbuf; +	qpid::framing::Buffer outbuf; + +        qpid::concurrent::Monitor* writeLock; +	qpid::concurrent::ThreadFactory* threadFactory; +	qpid::concurrent::Thread* receiver; + +	apr_pool_t* pool; +	apr_socket_t* socket; + +        void checkIdle(apr_status_t status); +	void writeBlock(qpid::framing::AMQDataBlock* data); +	void writeToSocket(char* data, size_t available); +        void setSocketTimeout(); + +	void run(); +      public: -	virtual void connect(const std::string& host, int port) = 0; -	virtual void init(qpid::framing::ProtocolInitiation* header) = 0; -	virtual void close() = 0; -	virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0; -	virtual void setTimeoutHandler(TimeoutHandler* handler) = 0; -	virtual void setShutdownHandler(ShutdownHandler* handler) = 0; -	virtual qpid::framing::OutputHandler* getOutputHandler() = 0; -        /** -         * Set the timeout for reads, in secs. -         */ -        virtual void setReadTimeout(u_int16_t timeout) = 0; -        /** -         * Set the timeout for writes, in secs. -         */ -        virtual void setWriteTimeout(u_int16_t timeout) = 0; -	virtual ~Connector(){} +	Connector(bool debug = false, u_int32_t buffer_size = 1024); +	virtual ~Connector(); +	virtual void connect(const std::string& host, int port); +	virtual void init(qpid::framing::ProtocolInitiation* header); +	virtual void close(); +	virtual void setInputHandler(qpid::framing::InputHandler* handler); +	virtual void setTimeoutHandler(TimeoutHandler* handler); +	virtual void setShutdownHandler(ShutdownHandler* handler); +	virtual qpid::framing::OutputHandler* getOutputHandler(); +	virtual void send(qpid::framing::AMQFrame* frame); +        virtual void setReadTimeout(u_int16_t timeout); +        virtual void setWriteTimeout(u_int16_t timeout);      };  } diff --git a/cpp/src/qpid/io/ConnectorImpl.h b/cpp/src/qpid/io/ConnectorImpl.h deleted file mode 100644 index 55dcf7a2d4..0000000000 --- a/cpp/src/qpid/io/ConnectorImpl.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRConnectorImpl_ -#define _APRConnectorImpl_ - -#ifdef _USE_APR_IO_ -#include "qpid/io/APRConnector.h" -#else -#include "qpid/io/LConnector.h" -#endif - -namespace qpid { -namespace io { - -#ifdef _USE_APR_IO_ -    class ConnectorImpl : public virtual APRConnector -    { -         -    public: -	ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):APRConnector(_debug,buffer_size){}; -	virtual ~ConnectorImpl(){}; -    }; -#else -    class ConnectorImpl : public virtual LConnector -    { -         -    public: -	ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):LConnector(_debug, buffer_size){}; -	virtual ~ConnectorImpl(){}; -    }; - -#endif - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LConnector.h b/cpp/src/qpid/io/LConnector.h deleted file mode 100644 index 5fc86597bd..0000000000 --- a/cpp/src/qpid/io/LConnector.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LConnector_ -#define _LConnector_ - - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" - -namespace qpid { -namespace io { - -    class LConnector : public virtual qpid::framing::OutputHandler,  -	public virtual Connector, -	private virtual qpid::concurrent::Runnable -    { - -    public: -	LConnector(bool debug = false, u_int32_t buffer_size = 1024){}; -	virtual ~LConnector(){}; - -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFAcceptor.cpp b/cpp/src/qpid/io/LFAcceptor.cpp deleted file mode 100644 index 7e51a550af..0000000000 --- a/cpp/src/qpid/io/LFAcceptor.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/LFAcceptor.h" -#include "qpid/concurrent/APRBase.h" - -using namespace qpid::concurrent; -using namespace qpid::io; - -LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : -    processor(aprPool.pool, worker_threads, 1000, 5000000), -    max_connections_per_processor(m),  -    debug(_debug), -    connectionBacklog(c) -{ } - - -int16_t LFAcceptor::bind(int16_t _port){ -    apr_sockaddr_t* address; -    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool)); -    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); -    CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); -    CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); -    CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); -    return getPort(); -} - -int16_t LFAcceptor::getPort() const { -    apr_sockaddr_t* address; -    CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); -    return address->port; -} - -void LFAcceptor::run(SessionHandlerFactory* factory) { -    running = true; -    processor.start(); -    std::cout << "Listening on port " << getPort() << "..." << std::endl; -    while(running){ -        apr_socket_t* client; -        apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); -        if(status == APR_SUCCESS){ -            //make this socket non-blocking: -            CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); -            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); -            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); -            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); -            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); -            LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug); -            session->init(factory->create(session)); -        }else{ -            running = false; -            if(status != APR_EINTR){ -                std::cout << "ERROR: " << get_desc(status) << std::endl; -            } -        } -    } -    shutdown(); -} - -void LFAcceptor::shutdown() { -    // TODO aconway 2006-10-12: Cleanup, this is not thread safe. -    if (running) { -        running = false; -        processor.stop(); -        CHECK_APR_SUCCESS(apr_socket_close(socket)); -    } -} - - -LFAcceptor::~LFAcceptor(){} - -LFAcceptor::APRPool::APRPool(){ -    APRBase::increment(); -    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -LFAcceptor::APRPool::~APRPool(){ -    apr_pool_destroy(pool); -    APRBase::decrement(); -} diff --git a/cpp/src/qpid/io/LFAcceptor.h b/cpp/src/qpid/io/LFAcceptor.h deleted file mode 100644 index 35a556d500..0000000000 --- a/cpp/src/qpid/io/LFAcceptor.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *    http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFAcceptor_ -#define _LFAcceptor_ - -#include <vector> -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/APRThreadFactory.h" -#include "qpid/concurrent/APRThreadPool.h" -#include "qpid/io/LFProcessor.h" -#include "qpid/io/LFSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace io { - -    class LFAcceptor : public virtual Acceptor -    { -        class APRPool{ -        public: -            apr_pool_t* pool; -            APRPool(); -            ~APRPool(); -        }; - -        APRPool aprPool; -        LFProcessor processor; -        apr_socket_t* socket; -        const int max_connections_per_processor; -        const bool debug; -        const int connectionBacklog; - -        volatile bool running; - -    public: -	LFAcceptor(bool debug = false,  -                   int connectionBacklog = 10,  -                   int worker_threads = 5,  -                   int max_connections_per_processor = 500); -        virtual int16_t bind(int16_t port); -        virtual int16_t getPort() const; -        virtual void run(SessionHandlerFactory* factory); -        virtual void shutdown(); -	virtual ~LFAcceptor(); -    }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h index 16c789f0ff..5b61f444af 100644 --- a/cpp/src/qpid/io/LFProcessor.h +++ b/cpp/src/qpid/io/LFProcessor.h @@ -21,8 +21,8 @@  #include "apr-1/apr_poll.h"  #include <iostream>  #include <vector> -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h"  #include "qpid/concurrent/Runnable.h"  namespace qpid { @@ -50,9 +50,9 @@ namespace io {          const int workerCount;          bool hasLeader;          qpid::concurrent::Thread** const workers; -        qpid::concurrent::APRMonitor leadLock; -        qpid::concurrent::APRMonitor countLock; -        qpid::concurrent::APRThreadFactory factory; +        qpid::concurrent::Monitor leadLock; +        qpid::concurrent::Monitor countLock; +        qpid::concurrent::ThreadFactory factory;          std::vector<LFSessionContext*> sessions;          volatile bool stopped; diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp index 6d6d786841..ca1e6431a6 100644 --- a/cpp/src/qpid/io/LFSessionContext.cpp +++ b/cpp/src/qpid/io/LFSessionContext.cpp @@ -54,7 +54,7 @@ LFSessionContext::~LFSessionContext(){  void LFSessionContext::read(){      assert(!reading);           // No concurrent read.  -    reading = APRThread::currentThread(); +    reading = Thread::currentThread();      socket.read(in);      in.flip(); @@ -79,7 +79,7 @@ void LFSessionContext::read(){  void LFSessionContext::write(){      assert(!writing);           // No concurrent writes. -    writing = APRThread::currentThread(); +    writing = Thread::currentThread();      bool done = isClosed();      while(!done){ @@ -186,4 +186,4 @@ void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){      logLock.release();  } -APRMonitor LFSessionContext::logLock; +Monitor LFSessionContext::logLock; diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h index 9406bb97b6..8d30b54204 100644 --- a/cpp/src/qpid/io/LFSessionContext.h +++ b/cpp/src/qpid/io/LFSessionContext.h @@ -25,7 +25,7 @@  #include "apr-1/apr_time.h"  #include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/APRMonitor.h" +#include "qpid/concurrent/Monitor.h"  #include "qpid/io/APRSocket.h"  #include "qpid/framing/Buffer.h"  #include "qpid/io/LFProcessor.h" @@ -51,7 +51,7 @@ namespace io {          apr_pollfd_t fd;          std::queue<qpid::framing::AMQFrame*> framesToWrite; -        qpid::concurrent::APRMonitor writeLock; +        qpid::concurrent::Monitor writeLock;          bool processing;          bool closing; @@ -60,7 +60,7 @@ namespace io {          volatile unsigned int reading;          volatile unsigned int writing; -        static qpid::concurrent::APRMonitor logLock; +        static qpid::concurrent::Monitor logLock;          void log(const std::string& desc, qpid::framing::AMQFrame* const frame);      public: diff --git a/cpp/src/qpid/io/SessionManager.h b/cpp/src/qpid/io/doxygen_summary.h index e6b17451e4..1086f65f63 100644 --- a/cpp/src/qpid/io/SessionManager.h +++ b/cpp/src/qpid/io/doxygen_summary.h @@ -1,3 +1,6 @@ +#ifndef _doxygen_summary_ +#define _doxygen_summary_ +  /*   *   * Copyright (c) 2006 The Apache Software Foundation @@ -15,26 +18,17 @@   * limitations under the License.   *   */ -#ifndef _SessionManager_ -#define _SessionManager_ - -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" - -namespace qpid { -namespace io { - -    class SessionManager -    { -    public: -        virtual SessionHandler* init(SessionContext* ctxt) = 0; -        virtual void close(SessionContext* ctxt) = 0;         -        virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0; -	virtual ~SessionManager(){} -    }; -} -} +// No code just a doxygen comment for the namespace - -#endif +/** \namspace qpid::io + * IO classes used by client and broker. + * + * This namespace contains platform-neutral classes.  Platform + * specific classes are in a sub-namespace named after the + * platform. At build time the appropriate platform classes are + * imported into this namespace so other code does not need to be awre + * of the difference. + *  + */ +#endif  /*!_doxygen_summary_*/ diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp index e93676513a..40be6485f2 100644 --- a/cpp/src/qpidd.cpp +++ b/cpp/src/qpidd.cpp @@ -37,7 +37,7 @@ int main(int argc, char** argv)              config.usage();          }else{              apr_signal(SIGINT, handle_signal); -            Broker::shared_ptr broker = Broker::create(config); +            Broker::SharedPtr broker = Broker::create(config);              broker->run();          }          return 0; | 
