summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-15 18:28:29 +0000
committerAlan Conway <aconway@apache.org>2007-01-15 18:28:29 +0000
commit87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274 (patch)
tree03929353228d063bf8892d6c57d4bf46fa467ddd /cpp/lib
parentfa15e6d52022cc1576b19e3caaecf66260c1923e (diff)
downloadqpid-python-87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274.tar.gz
* Refactor: Moved major broker components (exchanges, queues etc.) from
class SessionHandlerImplFactory to more logical class Broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/Broker.cpp55
-rw-r--r--cpp/lib/broker/Broker.h20
-rw-r--r--cpp/lib/broker/SessionHandlerFactoryImpl.cpp48
-rw-r--r--cpp/lib/broker/SessionHandlerFactoryImpl.h43
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp58
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.h13
-rw-r--r--cpp/lib/common/framing/Responder.cpp4
-rw-r--r--cpp/lib/common/framing/Responder.h2
8 files changed, 134 insertions, 109 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index 6c0d7a3f3f..6a8b1f8538 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -20,19 +20,61 @@
*/
#include <iostream>
#include <memory>
-#include <Broker.h>
+#include "AMQFrame.h"
+#include "DirectExchange.h"
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+#include "MessageStoreModule.h"
+#include "NullMessageStore.h"
+#include "ProtocolInitiation.h"
+#include "SessionHandlerImpl.h"
+#include "sys/SessionContext.h"
+#include "sys/SessionHandler.h"
+#include "sys/SessionHandlerFactory.h"
+#include "sys/TimeoutHandler.h"
-using namespace qpid::broker;
-using namespace qpid::sys;
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
Broker::Broker(const Configuration& config) :
acceptor(Acceptor::create(config.getPort(),
config.getConnectionBacklog(),
config.getWorkerThreads(),
config.isTrace())),
- factory(config.getStore())
-{ }
+ queues(store.get()),
+ timeout(30000),
+ stagingThreshold(0),
+ cleaner(&queues, timeout/10),
+ factory(*this)
+{
+ if (config.getStore().empty())
+ store.reset(new NullMessageStore());
+ else
+ store.reset(new MessageStoreModule(config.getStore()));
+
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
+
+ if(store.get()) {
+ RecoveryManager recoverer(queues, exchanges);
+ MessageStoreSettings storeSettings = { getStagingThreshold() };
+ store->recover(recoverer, &storeSettings);
+ }
+
+ cleaner.start();
+}
Broker::shared_ptr Broker::create(int16_t port)
@@ -57,3 +99,6 @@ void Broker::shutdown() {
Broker::~Broker() { }
const int16_t Broker::DEFAULT_PORT(5672);
+
+
+}} // namespace qpid::broker
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h
index 8ea1a57c27..f831b680e9 100644
--- a/cpp/lib/broker/Broker.h
+++ b/cpp/lib/broker/Broker.h
@@ -27,6 +27,8 @@
#include <sys/Runnable.h>
#include <sys/Acceptor.h>
#include <SharedObject.h>
+#include <MessageStore.h>
+#include <AutoDelete.h>
namespace qpid {
namespace broker {
@@ -69,13 +71,27 @@ class Broker : public qpid::sys::Runnable,
/** Shut down the broker */
virtual void shutdown();
+ MessageStore& getStore() { return *store; }
+ QueueRegistry& getQueues() { return queues; }
+ ExchangeRegistry& getExchanges() { return exchanges; }
+ u_int32_t getTimeout() { return timeout; }
+ u_int64_t getStagingThreshold() { return stagingThreshold; }
+ AutoDelete& getCleaner() { return cleaner; }
+
private:
Broker(const Configuration& config);
+
qpid::sys::Acceptor::shared_ptr acceptor;
+ std::auto_ptr<MessageStore> store;
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ u_int32_t timeout;
+ u_int64_t stagingThreshold;
+ AutoDelete cleaner;
SessionHandlerFactoryImpl factory;
};
-}
-}
+
+}}
diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
index 1b5441e3cf..559fd6bca1 100644
--- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
@@ -19,51 +19,25 @@
*
*/
#include <SessionHandlerFactoryImpl.h>
-
-#include <DirectExchange.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <MessageStoreModule.h>
-#include <NullMessageStore.h>
#include <SessionHandlerImpl.h>
-using namespace qpid::broker;
-using namespace qpid::sys;
+namespace qpid {
+namespace broker {
-namespace
-{
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-}
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) :
- store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)),
- queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10)
-{
- exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- exchanges.declare(amq_direct, DirectExchange::typeName);
- exchanges.declare(amq_topic, TopicExchange::typeName);
- exchanges.declare(amq_fanout, FanOutExchange::typeName);
- exchanges.declare(amq_match, HeadersExchange::typeName);
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(Broker& b) : broker(b)
+{}
- if(store.get()) {
- RecoveryManager recoverer(queues, exchanges);
- MessageStoreSettings storeSettings = { settings.stagingThreshold };
- store->recover(recoverer, &storeSettings);
- }
- cleaner.start();
-}
-
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
{
- return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings);
+ broker.getCleaner().stop();
}
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+qpid::sys::SessionHandler*
+SessionHandlerFactoryImpl::create(qpid::sys::SessionContext* ctxt)
{
- cleaner.stop();
+ return new SessionHandlerImpl(ctxt, broker);
}
+
+}} // namespace qpid::broker
diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/SessionHandlerFactoryImpl.h
index a69b67b08d..49c42b4d1c 100644
--- a/cpp/lib/broker/SessionHandlerFactoryImpl.h
+++ b/cpp/lib/broker/SessionHandlerFactoryImpl.h
@@ -21,37 +21,26 @@
#ifndef _SessionHandlerFactoryImpl_
#define _SessionHandlerFactoryImpl_
-#include <AutoDelete.h>
-#include <ExchangeRegistry.h>
-#include <MessageStore.h>
-#include <QueueRegistry.h>
-#include <AMQFrame.h>
-#include <ProtocolInitiation.h>
-#include <sys/SessionContext.h>
-#include <sys/SessionHandler.h>
-#include <sys/SessionHandlerFactory.h>
-#include <sys/TimeoutHandler.h>
-#include <SessionHandlerImpl.h>
-#include <memory>
+#include "SessionHandlerFactory.h"
namespace qpid {
- namespace broker {
+namespace broker {
+class Broker;
- class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory
- {
- std::auto_ptr<MessageStore> store;
- QueueRegistry queues;
- ExchangeRegistry exchanges;
- const Settings settings;
- AutoDelete cleaner;
- public:
- SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000);
- virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
- virtual ~SessionHandlerFactoryImpl();
- };
+class SessionHandlerFactoryImpl : public qpid::sys::SessionHandlerFactory
+{
+ public:
+ SessionHandlerFactoryImpl(Broker& b);
+
+ virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
+
+ virtual ~SessionHandlerFactoryImpl();
- }
-}
+ private:
+ Broker& broker;
+};
+
+}}
#endif
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index f34ef59922..905ac83b92 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -26,21 +26,22 @@
#include "assert.h"
using namespace boost;
-using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
using namespace qpid::sys;
-SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
- QueueRegistry* _queues,
- ExchangeRegistry* _exchanges,
- AutoDelete* _cleaner,
- const Settings& _settings) :
+namespace qpid {
+namespace broker {
+
+SessionHandlerImpl::SessionHandlerImpl(
+ SessionContext* _context, Broker& broker) :
+
context(_context),
- queues(_queues),
- exchanges(_exchanges),
- cleaner(_cleaner),
- settings(_settings),
+ client(0),
+ queues(broker.getQueues()),
+ exchanges(broker.getExchanges()),
+ cleaner(broker.getCleaner()),
+ settings(broker.getTimeout(), broker.getStagingThreshold()),
basicHandler(new BasicHandlerImpl(this)),
channelHandler(new ChannelHandlerImpl(this)),
connectionHandler(new ConnectionHandlerImpl(this)),
@@ -49,10 +50,8 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
txHandler(new TxHandlerImpl(this)),
messageHandler(new MessageHandlerImpl(this)),
framemax(65536),
- heartbeat(0){
-
- client =NULL;
-}
+ heartbeat(0)
+{}
SessionHandlerImpl::~SessionHandlerImpl(){
@@ -75,7 +74,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha
queue = getChannel(channel)->getDefaultQueue();
if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
} else {
- queue = queues->find(name);
+ queue = queues.find(name);
if (queue == 0) {
throw ChannelException( 404, "Queue not found: " + name);
}
@@ -85,7 +84,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha
Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
- return exchanges->get(name);
+ return exchanges.get(name);
}
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -96,8 +95,10 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
switch(body->type())
{
case REQUEST_BODY:
+ // responder.received(frame);
case RESPONSE_BODY:
- case METHOD_BODY:
+ // requester.received(frame);
+ case METHOD_BODY: //
method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
try{
method->invoke(*this, channel);
@@ -164,7 +165,7 @@ void SessionHandlerImpl::closed(){
}
for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
string name = (*i)->getName();
- queues->destroy(name);
+ queues.destroy(name);
exclusiveQueues.erase(i);
}
} catch(std::exception& e) {
@@ -221,7 +222,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin
parent->channels[channel] = new Channel(
parent->client->getProtocolVersion() , parent->context, channel,
- parent->framemax, parent->queues->getStore(),
+ parent->framemax, parent->queues.getStore(),
parent->settings.stagingThreshold);
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
@@ -251,12 +252,12 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
const FieldTable& /*arguments*/){
if(passive){
- if(!parent->exchanges->get(exchange)){
+ if(!parent->exchanges.get(exchange)){
throw ChannelException(404, "Exchange not found: " + exchange);
}
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type);
if(!response.second && response.first->getType() != type){
throw ConnectionException(507, "Exchange already declared to be of type "
+ response.first->getType() + ", requested " + type);
@@ -288,7 +289,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- parent->exchanges->destroy(exchange);
+ parent->exchanges.destroy(exchange);
if(!nowait) parent->client->getExchange().deleteOk(channel);
}
@@ -300,7 +301,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue = parent->getQueue(name, channel);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
+ parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -310,11 +311,11 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue_created.first->create(arguments);
//add default binding:
- parent->exchanges->getDefault()->bind(queue, name, 0);
+ parent->exchanges.getDefault()->bind(queue, name, 0);
if (exclusive) {
parent->exclusiveQueues.push_back(queue);
} else if(autoDelete){
- parent->cleaner->add(queue);
+ parent->cleaner.add(queue);
}
}
}
@@ -332,7 +333,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
const FieldTable& arguments){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
if(exchange){
// kpvdr - cannot use this any longer as routingKey is now const
// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
@@ -369,7 +370,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
}
count = q->getMessageCount();
q->destroy();
- parent->queues->destroy(queue);
+ parent->queues.destroy(queue);
}
if(!nowait) parent->client->getQueue().deleteOk(channel, count);
@@ -424,7 +425,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
- Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName);
if(exchange){
Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
parent->getChannel(channel)->handlePublish(msg, exchange);
@@ -652,3 +653,4 @@ SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
+}}
diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h
index 25803fe1f7..08b05a11b6 100644
--- a/cpp/lib/broker/SessionHandlerImpl.h
+++ b/cpp/lib/broker/SessionHandlerImpl.h
@@ -40,6 +40,7 @@
#include <sys/SessionHandler.h>
#include <sys/TimeoutHandler.h>
#include <TopicExchange.h>
+#include "Broker.h"
namespace qpid {
namespace broker {
@@ -77,11 +78,10 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
qpid::sys::SessionContext* context;
qpid::framing::AMQP_ClientProxy* client;
- QueueRegistry* queues;
- ExchangeRegistry* const exchanges;
- AutoDelete* const cleaner;
- const Settings settings;
-
+ QueueRegistry& queues;
+ ExchangeRegistry& exchanges;
+ AutoDelete& cleaner;
+ Settings settings;
std::auto_ptr<BasicHandler> basicHandler;
std::auto_ptr<ChannelHandler> channelHandler;
std::auto_ptr<ConnectionHandler> connectionHandler;
@@ -112,8 +112,7 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
Exchange::shared_ptr findExchange(const string& name);
public:
- SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,
- ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings);
+ SessionHandlerImpl(qpid::sys::SessionContext* context, Broker& broker);
virtual void received(qpid::framing::AMQFrame* frame);
virtual void initiated(qpid::framing::ProtocolInitiation* header);
virtual void idleOut();
diff --git a/cpp/lib/common/framing/Responder.cpp b/cpp/lib/common/framing/Responder.cpp
index efe3609c7b..1fbbfb8542 100644
--- a/cpp/lib/common/framing/Responder.cpp
+++ b/cpp/lib/common/framing/Responder.cpp
@@ -30,9 +30,9 @@ void Responder::received(const AMQRequestBody::Data& request) {
responseMark = request.responseMark;
}
-void Responder::sending(AMQResponseBody::Data& response, RequestId toRequest) {
+void Responder::sending(AMQResponseBody::Data& response) {
response.responseId = ++lastId;
- response.requestId = toRequest;
+ // response.requestId should have been set by caller.
response.batchOffset = 0;
}
diff --git a/cpp/lib/common/framing/Responder.h b/cpp/lib/common/framing/Responder.h
index a11967acc1..0e1785256b 100644
--- a/cpp/lib/common/framing/Responder.h
+++ b/cpp/lib/common/framing/Responder.h
@@ -40,7 +40,7 @@ class Responder
void received(const AMQRequestBody::Data& request);
/** Called before sending a response to set respose data. */
- void sending(AMQResponseBody::Data& response, RequestId toRequest);
+ void sending(AMQResponseBody::Data& response);
/** Get the ID of the highest response acknowledged by the peer. */
ResponseId getResponseMark() { return responseMark; }