summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/AutoDelete.h10
-rw-r--r--cpp/src/qpid/broker/Broker.cpp46
-rw-r--r--cpp/src/qpid/broker/Broker.h37
-rw-r--r--cpp/src/qpid/broker/Channel.h4
-rw-r--r--cpp/src/qpid/broker/Configuration.cpp10
-rw-r--r--cpp/src/qpid/broker/Configuration.h3
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h4
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h4
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h4
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h4
-rw-r--r--cpp/src/qpid/broker/Message.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.h4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h4
16 files changed, 49 insertions, 95 deletions
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h
index 9faa4aa4c4..509ac3bec1 100644
--- a/cpp/src/qpid/broker/AutoDelete.h
+++ b/cpp/src/qpid/broker/AutoDelete.h
@@ -20,17 +20,17 @@
#include <iostream>
#include <queue>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/ThreadFactoryImpl.h"
+#include "qpid/concurrent/ThreadFactory.h"
namespace qpid {
namespace broker{
class AutoDelete : private virtual qpid::concurrent::Runnable{
- qpid::concurrent::ThreadFactoryImpl factory;
- qpid::concurrent::MonitorImpl lock;
- qpid::concurrent::MonitorImpl monitor;
+ qpid::concurrent::ThreadFactory factory;
+ qpid::concurrent::Monitor lock;
+ qpid::concurrent::Monitor monitor;
std::queue<Queue::shared_ptr> queues;
QueueRegistry* const registry;
const u_int32_t period;
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index fe859b240b..7b5f9e3e32 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -18,60 +18,30 @@
#include <iostream>
#include <memory>
#include "qpid/broker/Broker.h"
-#include "qpid/io/Acceptor.h"
-#include "qpid/broker/Configuration.h"
-#include "qpid/QpidError.h"
-#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/io/LFAcceptor.h"
using namespace qpid::broker;
using namespace qpid::io;
-namespace {
- Acceptor* createAcceptor(const Configuration& config){
- const string type(config.getAcceptor());
- if("blocking" == type){
- std::cout << "Using blocking acceptor " << std::endl;
- return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
- }else if("non-blocking" == type){
- std::cout << "Using non-blocking acceptor " << std::endl;
- return new LFAcceptor(config.isTrace(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.getMaxConnections());
- }
- throw Configuration::ParseException("Unrecognised acceptor: " + type);
- }
-}
-
Broker::Broker(const Configuration& config) :
- acceptor(createAcceptor(config)),
- port(config.getPort()),
- isBound(false) {}
+ acceptor(new Acceptor(config.getPort(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads()))
+{ }
+
-Broker::shared_ptr Broker::create(int port)
+Broker::SharedPtr Broker::create(int16_t port)
{
Configuration config;
config.setPort(port);
return create(config);
}
-Broker::shared_ptr Broker::create(const Configuration& config) {
- return Broker::shared_ptr(new Broker(config));
+Broker::SharedPtr Broker::create(const Configuration& config) {
+ return Broker::SharedPtr(new Broker(config));
}
-int16_t Broker::bind()
-{
- if (!isBound) {
- port = acceptor->bind(port);
- }
- return port;
-}
-
void Broker::run() {
- bind();
acceptor->run(&factory);
}
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 8581093910..dd87c47909 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -19,47 +19,35 @@
*
*/
-#include "qpid/io/Acceptor.h"
#include "qpid/broker/Configuration.h"
-#include "qpid/concurrent/Runnable.h"
#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include <boost/noncopyable.hpp>
-#include <boost/shared_ptr.hpp>
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/Acceptor.h"
+#include <qpid/SharedObject.h>
namespace qpid {
namespace broker {
/**
* A broker instance.
*/
- class Broker : public qpid::concurrent::Runnable, private boost::noncopyable {
- Broker(const Configuration& config); // Private, use create()
- std::auto_ptr<qpid::io::Acceptor> acceptor;
- SessionHandlerFactoryImpl factory;
- int16_t port;
- bool isBound;
-
+ class Broker : public qpid::concurrent::Runnable,
+ public qpid::SharedObject<Broker>
+ {
public:
static const int16_t DEFAULT_PORT;
virtual ~Broker();
- typedef boost::shared_ptr<Broker> shared_ptr;
/**
* Create a broker.
* @param port Port to listen on or 0 to pick a port dynamically.
*/
- static shared_ptr create(int port = DEFAULT_PORT);
+ static SharedPtr create(int16_t port = DEFAULT_PORT);
/**
- * Create a broker from a Configuration.
+ * Create a broker using a Configuration.
*/
- static shared_ptr create(const Configuration& config);
-
- /**
- * Bind to the listening port.
- * @return The port number bound.
- */
- virtual int16_t bind();
+ static SharedPtr create(const Configuration& config);
/**
* Return listening port. If called before bind this is
@@ -67,7 +55,7 @@ namespace qpid {
* port, which will be different if the configured port is
* 0.
*/
- virtual int16_t getPort() { return port; }
+ virtual int16_t getPort() const { return acceptor->getPort(); }
/**
* Run the broker. Implements Runnable::run() so the broker
@@ -77,6 +65,11 @@ namespace qpid {
/** Shut down the broker */
virtual void shutdown();
+
+ private:
+ Broker(const Configuration& config);
+ qpid::io::Acceptor::SharedPtr acceptor;
+ SessionHandlerFactoryImpl factory;
};
}
}
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
index f5aa0e45ed..13bd4cd450 100644
--- a/cpp/src/qpid/broker/Channel.h
+++ b/cpp/src/qpid/broker/Channel.h
@@ -37,7 +37,7 @@
#include "qpid/broker/TxAck.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
@@ -77,7 +77,7 @@ namespace qpid {
u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::concurrent::MonitorImpl deliveryLock;
+ qpid::concurrent::Monitor deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
TransactionalStore* store;
diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp
index 2dcefd878d..550b283d62 100644
--- a/cpp/src/qpid/broker/Configuration.cpp
+++ b/cpp/src/qpid/broker/Configuration.cpp
@@ -24,10 +24,9 @@ using namespace std;
Configuration::Configuration() :
trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
port('p', "port", "Sets the port to listen on (default=5672)", 5672),
- workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
- maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+ workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5),
+ maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500),
connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
- acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
help("help", "Prints usage information", false)
{
options.push_back(&trace);
@@ -35,7 +34,6 @@ Configuration::Configuration() :
options.push_back(&workerThreads);
options.push_back(&maxConnections);
options.push_back(&connectionBacklog);
- options.push_back(&acceptor);
options.push_back(&help);
}
@@ -85,10 +83,6 @@ int Configuration::getConnectionBacklog() const {
return connectionBacklog.getValue();
}
-string Configuration::getAcceptor() const {
- return acceptor.getValue();
-}
-
Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h
index 61ecc89ed9..e1e7c40947 100644
--- a/cpp/src/qpid/broker/Configuration.h
+++ b/cpp/src/qpid/broker/Configuration.h
@@ -92,7 +92,6 @@ namespace qpid {
IntOption workerThreads;
IntOption maxConnections;
IntOption connectionBacklog;
- StringOption acceptor;
BoolOption help;
typedef std::vector<Option*>::iterator op_iterator;
@@ -116,7 +115,6 @@ namespace qpid {
int getWorkerThreads() const;
int getMaxConnections() const;
int getConnectionBacklog() const;
- std::string getAcceptor() const;
void setHelp(bool b) { help.setValue(b); }
void setTrace(bool b) { trace.setValue(b); }
@@ -124,7 +122,6 @@ namespace qpid {
void setWorkerThreads(int i) { workerThreads.setValue(i); }
void setMaxConnections(int i) { maxConnections.setValue(i); }
void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
- void setAcceptor(const std::string& val) { acceptor.setValue(val); }
void usage();
};
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 5c5f78d90a..2c3143cd3c 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -23,14 +23,14 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
std::map<string, std::vector<Queue::shared_ptr> > bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index fca5462e72..c574a97e14 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -20,7 +20,7 @@
#include <map>
#include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace broker {
@@ -29,7 +29,7 @@ namespace broker {
class ExchangeRegistry{
typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);
void destroy(const string& name);
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 334f1ccdcc..83fcdb9b34 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -31,7 +31,7 @@ namespace broker {
class FanOutExchange : public virtual Exchange {
std::vector<Queue::shared_ptr> bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 2e2403361e..cf699ac455 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -22,7 +22,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange {
typedef std::vector<Binding> Bindings;
Bindings bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 962c74864e..e96cc65b95 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Message.h"
#include <iostream>
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 67fb6764be..88dad7aaf9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -17,7 +17,7 @@
*/
#include "qpid/broker/Queue.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include <iostream>
using namespace qpid::broker;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 93570f59cc..f954e48c20 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -27,7 +27,7 @@
#include "qpid/broker/ConnectionToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace broker {
@@ -56,7 +56,7 @@ namespace qpid {
bool queueing;
bool dispatching;
int next;
- mutable qpid::concurrent::MonitorImpl lock;
+ mutable qpid::concurrent::Monitor lock;
apr_time_t lastUsed;
Consumer* exclusive;
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 973201fe64..949c194bbe 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -16,7 +16,7 @@
*
*/
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/SessionHandlerImpl.h"
#include <sstream>
#include <assert.h>
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 6f80291192..4f9e4b882a 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -19,7 +19,7 @@
#define _QueueRegistry_
#include <map>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -77,7 +77,7 @@ class QueueRegistry{
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
int counter;
};
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 19ea732fbc..cb773b9a56 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -71,7 +71,7 @@ class TopicPattern : public Tokens
class TopicExchange : public virtual Exchange{
typedef std::map<TopicPattern, Queue::vector> BindingMap;
BindingMap bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;