diff options
| author | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
| commit | c256b20602a42a3d33f36bb0e8d9692906d282a6 (patch) | |
| tree | 0fba6619fbb98c3511785143ca30647c5e6a4469 /cpp/broker | |
| parent | 1e6a034ccd8e260e615195bf193aed7d37b928a8 (diff) | |
| download | qpid-python-c256b20602a42a3d33f36bb0e8d9692906d282a6.tar.gz | |
Converted broker to a class for use in tests, plugins etc.
qpid::Exception base class for all exceptions, inherits std::exception.
Require boost on all platforms: http://www.boost.org, 'yum boost' on fedora.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@463376 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker')
| -rw-r--r-- | cpp/broker/Makefile | 18 | ||||
| -rw-r--r-- | cpp/broker/inc/Broker.h | 86 | ||||
| -rw-r--r-- | cpp/broker/inc/Configuration.h | 34 | ||||
| -rw-r--r-- | cpp/broker/src/Broker.cpp | 94 | ||||
| -rw-r--r-- | cpp/broker/src/Configuration.cpp | 14 |
5 files changed, 164 insertions, 82 deletions
diff --git a/cpp/broker/Makefile b/cpp/broker/Makefile index afe93c455a..5c96589d95 100644 --- a/cpp/broker/Makefile +++ b/cpp/broker/Makefile @@ -15,31 +15,25 @@ # # -# Build broker library and executable. +# Build broker library. # QPID_HOME = ../.. include ${QPID_HOME}/cpp/options.mk - +TARGET=$(BROKER_LIB) SOURCES= $(wildcard src/*.cpp) OBJECTS= $(subst .cpp,.o,$(SOURCES)) -LIB_OBJECTS= $(subst src/Broker.o,,$(OBJECTS)) -EXE_OBJECTS= src/Broker.o - .PHONY: all clean -all: $(BROKER) +all: $(TARGET) @$(MAKE) -C test all clean: - -@rm -f ${OBJECTS} src/*.d ${BROKER} $(BROKER_LIB) + -@rm -f $(TARGET) ${OBJECTS} src/*.d @$(MAKE) -C test clean -$(BROKER): $(BROKER_LIB) $(EXE_OBJECTS) - ${CXX} -o $@ $(EXE_OBJECTS) $(LDFLAGS) -lapr-1 $(COMMON_LIB) $(BROKER_LIB) - -$(BROKER_LIB): $(LIB_OBJECTS) - $(CXX) -shared -o $@ $(LDFLAGS) $(LIB_OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR) +$(TARGET): $(OBJECTS) + $(CXX) -shared -o $@ $(LDFLAGS) $(OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR) -include $(SOURCES:.cpp=.d) diff --git a/cpp/broker/inc/Broker.h b/cpp/broker/inc/Broker.h new file mode 100644 index 0000000000..0cd2bd749e --- /dev/null +++ b/cpp/broker/inc/Broker.h @@ -0,0 +1,86 @@ +#ifndef _Broker_ +#define _Broker_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Acceptor.h" +#include "Configuration.h" +#include "Runnable.h" +#include "SessionHandlerFactoryImpl.h" +#include <boost/noncopyable.hpp> +#include <tr1/memory> + +namespace qpid { + namespace broker { + /** + * A broker instance. + */ + class Broker : public qpid::concurrent::Runnable, private boost::noncopyable { + Broker(const Configuration& config); // Private, use create() + std::auto_ptr<qpid::io::Acceptor> acceptor; + SessionHandlerFactoryImpl factory; + int16_t port; + bool isBound; + + public: + static const int16_t DEFAULT_PORT; + + virtual ~Broker(); + typedef std::tr1::shared_ptr<Broker> shared_ptr; + + /** + * Create a broker. + * @param port Port to listen on or 0 to pick a port dynamically. + */ + static shared_ptr create(int port = DEFAULT_PORT); + + /** + * Create a broker from a Configuration. + */ + static shared_ptr create(const Configuration& config); + + /** + * Bind to the listening port. + * @return The port number bound. + */ + virtual int16_t bind(); + + /** + * Return listening port. If called before bind this is + * the configured port. If called after it is the actual + * port, which will be different if the configured port is + * 0. + */ + virtual int16_t getPort() { return port; } + + /** + * Run the broker. Implements Runnable::run() so the broker + * can be run in a separate thread. + */ + virtual void run(); + + /** Shut down the broker */ + virtual void shutdown(); + }; + } +} + + + +#endif /*!_Broker_*/ diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h index 5ec70a839b..aaabdd23a0 100644 --- a/cpp/broker/inc/Configuration.h +++ b/cpp/broker/inc/Configuration.h @@ -21,11 +21,12 @@ #include <cstdlib> #include <iostream> #include <vector> +#include "Exception.h" namespace qpid { namespace broker { class Configuration{ - class Option{ + class Option { const std::string flag; const std::string name; const std::string desc; @@ -56,6 +57,7 @@ namespace qpid { int getValue() const; virtual bool needsValue() const; virtual void setValue(const std::string& value); + virtual void setValue(int _value) { value = _value; } }; class StringOption : public Option{ @@ -82,6 +84,7 @@ namespace qpid { bool getValue() const; virtual bool needsValue() const; virtual void setValue(const std::string& value); + virtual void setValue(bool _value) { value = _value; } }; BoolOption trace; @@ -96,10 +99,9 @@ namespace qpid { std::vector<Option*> options; public: - class ParseException{ - public: - const std::string& error; - ParseException(const std::string& _error) : error(_error) {} + class ParseException : public Exception { + public: + ParseException(const std::string& msg) : Exception(msg) {} }; @@ -108,13 +110,21 @@ namespace qpid { void parse(int argc, char** argv); - bool isHelp(); - bool isTrace(); - int getPort(); - int getWorkerThreads(); - int getMaxConnections(); - int getConnectionBacklog(); - const std::string& getAcceptor(); + bool isHelp() const; + bool isTrace() const; + int getPort() const; + int getWorkerThreads() const; + int getMaxConnections() const; + int getConnectionBacklog() const; + std::string getAcceptor() const; + + void setHelp(bool b) { help.setValue(b); } + void setTrace(bool b) { trace.setValue(b); } + void setPort(int i) { port.setValue(i); } + void setWorkerThreads(int i) { workerThreads.setValue(i); } + void setMaxConnections(int i) { maxConnections.setValue(i); } + void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } + void setAcceptor(const std::string& val) { acceptor.setValue(val); } void usage(); }; diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp index 99cf8d6ce4..b6472d1729 100644 --- a/cpp/broker/src/Broker.cpp +++ b/cpp/broker/src/Broker.cpp @@ -17,76 +17,68 @@ */ #include <iostream> #include <memory> -#include "apr_signal.h" - +#include "Broker.h" #include "Acceptor.h" #include "Configuration.h" #include "QpidError.h" #include "SessionHandlerFactoryImpl.h" - -//optional includes: -#ifdef _USE_APR_IO_ - #include "BlockingAPRAcceptor.h" #include "LFAcceptor.h" -#endif using namespace qpid::broker; using namespace qpid::io; -void handle_signal(int signal); +namespace { + Acceptor* createAcceptor(const Configuration& config){ + const string type(config.getAcceptor()); + if("blocking" == type){ + std::cout << "Using blocking acceptor " << std::endl; + return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); + }else if("non-blocking" == type){ + std::cout << "Using non-blocking acceptor " << std::endl; + return new LFAcceptor(config.isTrace(), + config.getConnectionBacklog(), + config.getWorkerThreads(), + config.getMaxConnections()); + } + throw Configuration::ParseException("Unrecognised acceptor: " + type); + } +} -Acceptor* createAcceptor(Configuration& config); +Broker::Broker(const Configuration& config) : + acceptor(createAcceptor(config)), + port(config.getPort()), + isBound(false) {} -int main(int argc, char** argv) +Broker::shared_ptr Broker::create(int port) { - SessionHandlerFactoryImpl factory; Configuration config; - try{ + config.setPort(port); + return create(config); +} - config.parse(argc, argv); - if(config.isHelp()){ - config.usage(); - }else{ -#ifdef _USE_APR_IO_ - apr_signal(SIGINT, handle_signal); -#endif - try{ - std::auto_ptr<Acceptor> acceptor(createAcceptor(config)); - try{ - acceptor->bind(config.getPort(), &factory); - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } - } - }catch(Configuration::ParseException error){ - std::cout << "Error: " << error.error << std::endl; +Broker::shared_ptr Broker::create(const Configuration& config) { + return Broker::shared_ptr(new Broker(config)); +} + +int16_t Broker::bind() +{ + if (!isBound) { + port = acceptor->bind(port); } - - return 1; + return port; } -Acceptor* createAcceptor(Configuration& config){ - const string type(config.getAcceptor()); -#ifdef _USE_APR_IO_ - if("blocking" == type){ - std::cout << "Using blocking acceptor " << std::endl; - return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); - }else if("non-blocking" == type){ - std::cout << "Using non-blocking acceptor " << std::endl; - return new LFAcceptor(config.isTrace(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.getMaxConnections()); - } -#endif - throw Configuration::ParseException("Unrecognised acceptor: " + type); +void Broker::run() { + bind(); + acceptor->run(&factory); } -void handle_signal(int /*signal*/){ - std::cout << "Shutting down..." << std::endl; +void Broker::shutdown() { + acceptor->shutdown(); } + +Broker::~Broker() { } + +const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp index 11c2d374fe..6e7df7889e 100644 --- a/cpp/broker/src/Configuration.cpp +++ b/cpp/broker/src/Configuration.cpp @@ -61,31 +61,31 @@ void Configuration::usage(){ } } -bool Configuration::isHelp(){ +bool Configuration::isHelp() const { return help.getValue(); } -bool Configuration::isTrace(){ +bool Configuration::isTrace() const { return trace.getValue(); } -int Configuration::getPort(){ +int Configuration::getPort() const { return port.getValue(); } -int Configuration::getWorkerThreads(){ +int Configuration::getWorkerThreads() const { return workerThreads.getValue(); } -int Configuration::getMaxConnections(){ +int Configuration::getMaxConnections() const { return maxConnections.getValue(); } -int Configuration::getConnectionBacklog(){ +int Configuration::getConnectionBacklog() const { return connectionBacklog.getValue(); } -const string& Configuration::getAcceptor(){ +string Configuration::getAcceptor() const { return acceptor.getValue(); } |
