diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/AutoDelete.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 46 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 37 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Channel.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Configuration.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Configuration.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DirectExchange.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TopicExchange.h | 4 |
16 files changed, 49 insertions, 95 deletions
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h index 9faa4aa4c4..509ac3bec1 100644 --- a/cpp/src/qpid/broker/AutoDelete.h +++ b/cpp/src/qpid/broker/AutoDelete.h @@ -20,17 +20,17 @@ #include <iostream> #include <queue> -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/ThreadFactoryImpl.h" +#include "qpid/concurrent/ThreadFactory.h" namespace qpid { namespace broker{ class AutoDelete : private virtual qpid::concurrent::Runnable{ - qpid::concurrent::ThreadFactoryImpl factory; - qpid::concurrent::MonitorImpl lock; - qpid::concurrent::MonitorImpl monitor; + qpid::concurrent::ThreadFactory factory; + qpid::concurrent::Monitor lock; + qpid::concurrent::Monitor monitor; std::queue<Queue::shared_ptr> queues; QueueRegistry* const registry; const u_int32_t period; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index fe859b240b..7b5f9e3e32 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -18,60 +18,30 @@ #include <iostream> #include <memory> #include "qpid/broker/Broker.h" -#include "qpid/io/Acceptor.h" -#include "qpid/broker/Configuration.h" -#include "qpid/QpidError.h" -#include "qpid/broker/SessionHandlerFactoryImpl.h" -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/io/LFAcceptor.h" using namespace qpid::broker; using namespace qpid::io; -namespace { - Acceptor* createAcceptor(const Configuration& config){ - const string type(config.getAcceptor()); - if("blocking" == type){ - std::cout << "Using blocking acceptor " << std::endl; - return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); - }else if("non-blocking" == type){ - std::cout << "Using non-blocking acceptor " << std::endl; - return new LFAcceptor(config.isTrace(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.getMaxConnections()); - } - throw Configuration::ParseException("Unrecognised acceptor: " + type); - } -} - Broker::Broker(const Configuration& config) : - acceptor(createAcceptor(config)), - port(config.getPort()), - isBound(false) {} + acceptor(new Acceptor(config.getPort(), + config.getConnectionBacklog(), + config.getWorkerThreads())) +{ } + -Broker::shared_ptr Broker::create(int port) +Broker::SharedPtr Broker::create(int16_t port) { Configuration config; config.setPort(port); return create(config); } -Broker::shared_ptr Broker::create(const Configuration& config) { - return Broker::shared_ptr(new Broker(config)); +Broker::SharedPtr Broker::create(const Configuration& config) { + return Broker::SharedPtr(new Broker(config)); } -int16_t Broker::bind() -{ - if (!isBound) { - port = acceptor->bind(port); - } - return port; -} - void Broker::run() { - bind(); acceptor->run(&factory); } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 8581093910..dd87c47909 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -19,47 +19,35 @@ * */ -#include "qpid/io/Acceptor.h" #include "qpid/broker/Configuration.h" -#include "qpid/concurrent/Runnable.h" #include "qpid/broker/SessionHandlerFactoryImpl.h" -#include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> +#include "qpid/concurrent/Runnable.h" +#include "qpid/io/Acceptor.h" +#include <qpid/SharedObject.h> namespace qpid { namespace broker { /** * A broker instance. */ - class Broker : public qpid::concurrent::Runnable, private boost::noncopyable { - Broker(const Configuration& config); // Private, use create() - std::auto_ptr<qpid::io::Acceptor> acceptor; - SessionHandlerFactoryImpl factory; - int16_t port; - bool isBound; - + class Broker : public qpid::concurrent::Runnable, + public qpid::SharedObject<Broker> + { public: static const int16_t DEFAULT_PORT; virtual ~Broker(); - typedef boost::shared_ptr<Broker> shared_ptr; /** * Create a broker. * @param port Port to listen on or 0 to pick a port dynamically. */ - static shared_ptr create(int port = DEFAULT_PORT); + static SharedPtr create(int16_t port = DEFAULT_PORT); /** - * Create a broker from a Configuration. + * Create a broker using a Configuration. */ - static shared_ptr create(const Configuration& config); - - /** - * Bind to the listening port. - * @return The port number bound. - */ - virtual int16_t bind(); + static SharedPtr create(const Configuration& config); /** * Return listening port. If called before bind this is @@ -67,7 +55,7 @@ namespace qpid { * port, which will be different if the configured port is * 0. */ - virtual int16_t getPort() { return port; } + virtual int16_t getPort() const { return acceptor->getPort(); } /** * Run the broker. Implements Runnable::run() so the broker @@ -77,6 +65,11 @@ namespace qpid { /** Shut down the broker */ virtual void shutdown(); + + private: + Broker(const Configuration& config); + qpid::io::Acceptor::SharedPtr acceptor; + SessionHandlerFactoryImpl factory; }; } } diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h index f5aa0e45ed..13bd4cd450 100644 --- a/cpp/src/qpid/broker/Channel.h +++ b/cpp/src/qpid/broker/Channel.h @@ -37,7 +37,7 @@ #include "qpid/broker/TxAck.h" #include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxPublish.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" @@ -77,7 +77,7 @@ namespace qpid { u_int32_t framesize; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; - qpid::concurrent::MonitorImpl deliveryLock; + qpid::concurrent::Monitor deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; TransactionalStore* store; diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp index 2dcefd878d..550b283d62 100644 --- a/cpp/src/qpid/broker/Configuration.cpp +++ b/cpp/src/qpid/broker/Configuration.cpp @@ -24,10 +24,9 @@ using namespace std; Configuration::Configuration() : trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false), port('p', "port", "Sets the port to listen on (default=5672)", 5672), - workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5), - maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500), + workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5), + maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500), connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), - acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"), help("help", "Prints usage information", false) { options.push_back(&trace); @@ -35,7 +34,6 @@ Configuration::Configuration() : options.push_back(&workerThreads); options.push_back(&maxConnections); options.push_back(&connectionBacklog); - options.push_back(&acceptor); options.push_back(&help); } @@ -85,10 +83,6 @@ int Configuration::getConnectionBacklog() const { return connectionBacklog.getValue(); } -string Configuration::getAcceptor() const { - return acceptor.getValue(); -} - Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : flag(string("-") + _flag), name("--" +_name), desc(_desc) {} diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h index 61ecc89ed9..e1e7c40947 100644 --- a/cpp/src/qpid/broker/Configuration.h +++ b/cpp/src/qpid/broker/Configuration.h @@ -92,7 +92,6 @@ namespace qpid { IntOption workerThreads; IntOption maxConnections; IntOption connectionBacklog; - StringOption acceptor; BoolOption help; typedef std::vector<Option*>::iterator op_iterator; @@ -116,7 +115,6 @@ namespace qpid { int getWorkerThreads() const; int getMaxConnections() const; int getConnectionBacklog() const; - std::string getAcceptor() const; void setHelp(bool b) { help.setValue(b); } void setTrace(bool b) { trace.setValue(b); } @@ -124,7 +122,6 @@ namespace qpid { void setWorkerThreads(int i) { workerThreads.setValue(i); } void setMaxConnections(int i) { maxConnections.setValue(i); } void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } - void setAcceptor(const std::string& val) { acceptor.setValue(val); } void usage(); }; diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 5c5f78d90a..2c3143cd3c 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -23,14 +23,14 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ std::map<string, std::vector<Queue::shared_ptr> > bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index fca5462e72..c574a97e14 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -20,7 +20,7 @@ #include <map> #include "qpid/broker/Exchange.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace broker { @@ -29,7 +29,7 @@ namespace broker { class ExchangeRegistry{ typedef std::map<string, Exchange::shared_ptr> ExchangeMap; ExchangeMap exchanges; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException); void destroy(const string& name); diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 334f1ccdcc..83fcdb9b34 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -23,7 +23,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -31,7 +31,7 @@ namespace broker { class FanOutExchange : public virtual Exchange { std::vector<Queue::shared_ptr> bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 2e2403361e..cf699ac455 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -22,7 +22,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange { typedef std::vector<Binding> Bindings; Bindings bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 962c74864e..e96cc65b95 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -15,7 +15,7 @@ * limitations under the License. * */ -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Message.h" #include <iostream> diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 67fb6764be..88dad7aaf9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -17,7 +17,7 @@ */ #include "qpid/broker/Queue.h" #include "qpid/broker/MessageStore.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include <iostream> using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 93570f59cc..f954e48c20 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -27,7 +27,7 @@ #include "qpid/broker/ConnectionToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace broker { @@ -56,7 +56,7 @@ namespace qpid { bool queueing; bool dispatching; int next; - mutable qpid::concurrent::MonitorImpl lock; + mutable qpid::concurrent::Monitor lock; apr_time_t lastUsed; Consumer* exclusive; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 973201fe64..949c194bbe 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -16,7 +16,7 @@ * */ #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/SessionHandlerImpl.h" #include <sstream> #include <assert.h> diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 6f80291192..4f9e4b882a 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -19,7 +19,7 @@ #define _QueueRegistry_ #include <map> -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -77,7 +77,7 @@ class QueueRegistry{ private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; int counter; }; diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 19ea732fbc..cb773b9a56 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -23,7 +23,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -71,7 +71,7 @@ class TopicPattern : public Tokens class TopicExchange : public virtual Exchange{ typedef std::map<TopicPattern, Queue::vector> BindingMap; BindingMap bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; |
