summaryrefslogtreecommitdiff
path: root/cpp/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-12 18:52:49 +0000
committerAlan Conway <aconway@apache.org>2006-10-12 18:52:49 +0000
commitc256b20602a42a3d33f36bb0e8d9692906d282a6 (patch)
tree0fba6619fbb98c3511785143ca30647c5e6a4469 /cpp/broker
parent1e6a034ccd8e260e615195bf193aed7d37b928a8 (diff)
downloadqpid-python-c256b20602a42a3d33f36bb0e8d9692906d282a6.tar.gz
Converted broker to a class for use in tests, plugins etc.
qpid::Exception base class for all exceptions, inherits std::exception. Require boost on all platforms: http://www.boost.org, 'yum boost' on fedora. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@463376 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker')
-rw-r--r--cpp/broker/Makefile18
-rw-r--r--cpp/broker/inc/Broker.h86
-rw-r--r--cpp/broker/inc/Configuration.h34
-rw-r--r--cpp/broker/src/Broker.cpp94
-rw-r--r--cpp/broker/src/Configuration.cpp14
5 files changed, 164 insertions, 82 deletions
diff --git a/cpp/broker/Makefile b/cpp/broker/Makefile
index afe93c455a..5c96589d95 100644
--- a/cpp/broker/Makefile
+++ b/cpp/broker/Makefile
@@ -15,31 +15,25 @@
#
#
-# Build broker library and executable.
+# Build broker library.
#
QPID_HOME = ../..
include ${QPID_HOME}/cpp/options.mk
-
+TARGET=$(BROKER_LIB)
SOURCES= $(wildcard src/*.cpp)
OBJECTS= $(subst .cpp,.o,$(SOURCES))
-LIB_OBJECTS= $(subst src/Broker.o,,$(OBJECTS))
-EXE_OBJECTS= src/Broker.o
-
.PHONY: all clean
-all: $(BROKER)
+all: $(TARGET)
@$(MAKE) -C test all
clean:
- -@rm -f ${OBJECTS} src/*.d ${BROKER} $(BROKER_LIB)
+ -@rm -f $(TARGET) ${OBJECTS} src/*.d
@$(MAKE) -C test clean
-$(BROKER): $(BROKER_LIB) $(EXE_OBJECTS)
- ${CXX} -o $@ $(EXE_OBJECTS) $(LDFLAGS) -lapr-1 $(COMMON_LIB) $(BROKER_LIB)
-
-$(BROKER_LIB): $(LIB_OBJECTS)
- $(CXX) -shared -o $@ $(LDFLAGS) $(LIB_OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR)
+$(TARGET): $(OBJECTS)
+ $(CXX) -shared -o $@ $(LDFLAGS) $(OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR)
-include $(SOURCES:.cpp=.d)
diff --git a/cpp/broker/inc/Broker.h b/cpp/broker/inc/Broker.h
new file mode 100644
index 0000000000..0cd2bd749e
--- /dev/null
+++ b/cpp/broker/inc/Broker.h
@@ -0,0 +1,86 @@
+#ifndef _Broker_
+#define _Broker_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Acceptor.h"
+#include "Configuration.h"
+#include "Runnable.h"
+#include "SessionHandlerFactoryImpl.h"
+#include <boost/noncopyable.hpp>
+#include <tr1/memory>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * A broker instance.
+ */
+ class Broker : public qpid::concurrent::Runnable, private boost::noncopyable {
+ Broker(const Configuration& config); // Private, use create()
+ std::auto_ptr<qpid::io::Acceptor> acceptor;
+ SessionHandlerFactoryImpl factory;
+ int16_t port;
+ bool isBound;
+
+ public:
+ static const int16_t DEFAULT_PORT;
+
+ virtual ~Broker();
+ typedef std::tr1::shared_ptr<Broker> shared_ptr;
+
+ /**
+ * Create a broker.
+ * @param port Port to listen on or 0 to pick a port dynamically.
+ */
+ static shared_ptr create(int port = DEFAULT_PORT);
+
+ /**
+ * Create a broker from a Configuration.
+ */
+ static shared_ptr create(const Configuration& config);
+
+ /**
+ * Bind to the listening port.
+ * @return The port number bound.
+ */
+ virtual int16_t bind();
+
+ /**
+ * Return listening port. If called before bind this is
+ * the configured port. If called after it is the actual
+ * port, which will be different if the configured port is
+ * 0.
+ */
+ virtual int16_t getPort() { return port; }
+
+ /**
+ * Run the broker. Implements Runnable::run() so the broker
+ * can be run in a separate thread.
+ */
+ virtual void run();
+
+ /** Shut down the broker */
+ virtual void shutdown();
+ };
+ }
+}
+
+
+
+#endif /*!_Broker_*/
diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h
index 5ec70a839b..aaabdd23a0 100644
--- a/cpp/broker/inc/Configuration.h
+++ b/cpp/broker/inc/Configuration.h
@@ -21,11 +21,12 @@
#include <cstdlib>
#include <iostream>
#include <vector>
+#include "Exception.h"
namespace qpid {
namespace broker {
class Configuration{
- class Option{
+ class Option {
const std::string flag;
const std::string name;
const std::string desc;
@@ -56,6 +57,7 @@ namespace qpid {
int getValue() const;
virtual bool needsValue() const;
virtual void setValue(const std::string& value);
+ virtual void setValue(int _value) { value = _value; }
};
class StringOption : public Option{
@@ -82,6 +84,7 @@ namespace qpid {
bool getValue() const;
virtual bool needsValue() const;
virtual void setValue(const std::string& value);
+ virtual void setValue(bool _value) { value = _value; }
};
BoolOption trace;
@@ -96,10 +99,9 @@ namespace qpid {
std::vector<Option*> options;
public:
- class ParseException{
- public:
- const std::string& error;
- ParseException(const std::string& _error) : error(_error) {}
+ class ParseException : public Exception {
+ public:
+ ParseException(const std::string& msg) : Exception(msg) {}
};
@@ -108,13 +110,21 @@ namespace qpid {
void parse(int argc, char** argv);
- bool isHelp();
- bool isTrace();
- int getPort();
- int getWorkerThreads();
- int getMaxConnections();
- int getConnectionBacklog();
- const std::string& getAcceptor();
+ bool isHelp() const;
+ bool isTrace() const;
+ int getPort() const;
+ int getWorkerThreads() const;
+ int getMaxConnections() const;
+ int getConnectionBacklog() const;
+ std::string getAcceptor() const;
+
+ void setHelp(bool b) { help.setValue(b); }
+ void setTrace(bool b) { trace.setValue(b); }
+ void setPort(int i) { port.setValue(i); }
+ void setWorkerThreads(int i) { workerThreads.setValue(i); }
+ void setMaxConnections(int i) { maxConnections.setValue(i); }
+ void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
+ void setAcceptor(const std::string& val) { acceptor.setValue(val); }
void usage();
};
diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp
index 99cf8d6ce4..b6472d1729 100644
--- a/cpp/broker/src/Broker.cpp
+++ b/cpp/broker/src/Broker.cpp
@@ -17,76 +17,68 @@
*/
#include <iostream>
#include <memory>
-#include "apr_signal.h"
-
+#include "Broker.h"
#include "Acceptor.h"
#include "Configuration.h"
#include "QpidError.h"
#include "SessionHandlerFactoryImpl.h"
-
-//optional includes:
-#ifdef _USE_APR_IO_
-
#include "BlockingAPRAcceptor.h"
#include "LFAcceptor.h"
-#endif
using namespace qpid::broker;
using namespace qpid::io;
-void handle_signal(int signal);
+namespace {
+ Acceptor* createAcceptor(const Configuration& config){
+ const string type(config.getAcceptor());
+ if("blocking" == type){
+ std::cout << "Using blocking acceptor " << std::endl;
+ return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
+ }else if("non-blocking" == type){
+ std::cout << "Using non-blocking acceptor " << std::endl;
+ return new LFAcceptor(config.isTrace(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads(),
+ config.getMaxConnections());
+ }
+ throw Configuration::ParseException("Unrecognised acceptor: " + type);
+ }
+}
-Acceptor* createAcceptor(Configuration& config);
+Broker::Broker(const Configuration& config) :
+ acceptor(createAcceptor(config)),
+ port(config.getPort()),
+ isBound(false) {}
-int main(int argc, char** argv)
+Broker::shared_ptr Broker::create(int port)
{
- SessionHandlerFactoryImpl factory;
Configuration config;
- try{
+ config.setPort(port);
+ return create(config);
+}
- config.parse(argc, argv);
- if(config.isHelp()){
- config.usage();
- }else{
-#ifdef _USE_APR_IO_
- apr_signal(SIGINT, handle_signal);
-#endif
- try{
- std::auto_ptr<Acceptor> acceptor(createAcceptor(config));
- try{
- acceptor->bind(config.getPort(), &factory);
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
- }
- }catch(Configuration::ParseException error){
- std::cout << "Error: " << error.error << std::endl;
+Broker::shared_ptr Broker::create(const Configuration& config) {
+ return Broker::shared_ptr(new Broker(config));
+}
+
+int16_t Broker::bind()
+{
+ if (!isBound) {
+ port = acceptor->bind(port);
}
-
- return 1;
+ return port;
}
-Acceptor* createAcceptor(Configuration& config){
- const string type(config.getAcceptor());
-#ifdef _USE_APR_IO_
- if("blocking" == type){
- std::cout << "Using blocking acceptor " << std::endl;
- return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
- }else if("non-blocking" == type){
- std::cout << "Using non-blocking acceptor " << std::endl;
- return new LFAcceptor(config.isTrace(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.getMaxConnections());
- }
-#endif
- throw Configuration::ParseException("Unrecognised acceptor: " + type);
+void Broker::run() {
+ bind();
+ acceptor->run(&factory);
}
-void handle_signal(int /*signal*/){
- std::cout << "Shutting down..." << std::endl;
+void Broker::shutdown() {
+ acceptor->shutdown();
}
+
+Broker::~Broker() { }
+
+const int16_t Broker::DEFAULT_PORT(5672);
diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp
index 11c2d374fe..6e7df7889e 100644
--- a/cpp/broker/src/Configuration.cpp
+++ b/cpp/broker/src/Configuration.cpp
@@ -61,31 +61,31 @@ void Configuration::usage(){
}
}
-bool Configuration::isHelp(){
+bool Configuration::isHelp() const {
return help.getValue();
}
-bool Configuration::isTrace(){
+bool Configuration::isTrace() const {
return trace.getValue();
}
-int Configuration::getPort(){
+int Configuration::getPort() const {
return port.getValue();
}
-int Configuration::getWorkerThreads(){
+int Configuration::getWorkerThreads() const {
return workerThreads.getValue();
}
-int Configuration::getMaxConnections(){
+int Configuration::getMaxConnections() const {
return maxConnections.getValue();
}
-int Configuration::getConnectionBacklog(){
+int Configuration::getConnectionBacklog() const {
return connectionBacklog.getValue();
}
-const string& Configuration::getAcceptor(){
+string Configuration::getAcceptor() const {
return acceptor.getValue();
}