diff options
| author | Gordon Sim <gsim@apache.org> | 2006-11-15 10:28:11 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2006-11-15 10:28:11 +0000 |
| commit | 8690d6d8c01335523a8a4b1677979ee1ce51dec0 (patch) | |
| tree | 8df5afa2dcd96a57b9cbf869a7a3a672af4ba134 /cpp/src/qpid/broker | |
| parent | 71ae30ea0b7d3cb4b848ad84fb90c782894cf606 (diff) | |
| download | qpid-python-8690d6d8c01335523a8a4b1677979ee1ce51dec0.tar.gz | |
Added ability for broker to load a message store implementation from a library.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@475181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Configuration.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Configuration.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 79 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.h | 2 |
7 files changed, 150 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 74ee4df952..3f0f00377d 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -29,7 +29,8 @@ using namespace qpid::sys; Broker::Broker(const Configuration& config) : acceptor(Acceptor::create(config.getPort(), config.getConnectionBacklog(), - config.getWorkerThreads())) + config.getWorkerThreads())), + factory(config.getStore()) { } diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp index 4cc9cd3a0c..39f5c23ee6 100644 --- a/cpp/src/qpid/broker/Configuration.cpp +++ b/cpp/src/qpid/broker/Configuration.cpp @@ -30,6 +30,7 @@ Configuration::Configuration() : workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5), maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500), connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), + store('s', "store", "Sets the message store module to use (default='' which implies no store)", ""), help("help", "Prints usage information", false) { options.push_back(&trace); @@ -37,6 +38,7 @@ Configuration::Configuration() : options.push_back(&workerThreads); options.push_back(&maxConnections); options.push_back(&connectionBacklog); + options.push_back(&store); options.push_back(&help); } @@ -86,6 +88,10 @@ int Configuration::getConnectionBacklog() const { return connectionBacklog.getValue(); } +const std::string& Configuration::getStore() const { + return store.getValue(); +} + Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : flag(string("-") + _flag), name("--" +_name), desc(_desc) {} diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h index 54b604ac22..1a081764bf 100644 --- a/cpp/src/qpid/broker/Configuration.h +++ b/cpp/src/qpid/broker/Configuration.h @@ -95,6 +95,7 @@ namespace qpid { IntOption workerThreads; IntOption maxConnections; IntOption connectionBacklog; + StringOption store; BoolOption help; typedef std::vector<Option*>::iterator op_iterator; @@ -118,6 +119,7 @@ namespace qpid { int getWorkerThreads() const; int getMaxConnections() const; int getConnectionBacklog() const; + const std::string& getStore() const; void setHelp(bool b) { help.setValue(b); } void setTrace(bool b) { trace.setValue(b); } @@ -125,6 +127,7 @@ namespace qpid { void setWorkerThreads(int i) { workerThreads.setValue(i); } void setMaxConnections(int i) { maxConnections.setValue(i); } void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } + void setStore(const std::string& s) { store.setValue(s); } void usage(); }; diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp new file mode 100644 index 0000000000..f5c27ca6bc --- /dev/null +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/MessageStoreModule.h> +#include <iostream> + +using namespace qpid::broker; + +MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) +{ +} + +void MessageStoreModule::create(const Queue& queue) +{ + store->create(queue); +} + +void MessageStoreModule::destroy(const Queue& queue) +{ + store->destroy(queue); +} + +void MessageStoreModule::recover(QueueRegistry& registry) +{ + store->recover(registry); +} + +void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +{ + store->enqueue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +{ + store->dequeue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::committed(const string * const xid) +{ + store->committed(xid); +} + +void MessageStoreModule::aborted(const string * const xid) +{ + store->aborted(xid); +} + +std::auto_ptr<TransactionContext> MessageStoreModule::begin() +{ + return store->begin(); +} + +void MessageStoreModule::commit(TransactionContext* ctxt) +{ + store->commit(ctxt); +} + +void MessageStoreModule::abort(TransactionContext* ctxt) +{ + store->abort(ctxt); +} diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h new file mode 100644 index 0000000000..490cfbb5c7 --- /dev/null +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageStoreModule_ +#define _MessageStoreModule_ + +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageStore.h> +#include <qpid/broker/Queue.h> +#include <qpid/broker/QueueRegistry.h> +#include <qpid/sys/Module.h> + +namespace qpid { + namespace broker { + /** + * A null implementation of the MessageStore interface + */ + class MessageStoreModule : public MessageStore{ + qpid::sys::Module<MessageStore> store; + public: + MessageStoreModule(const std::string& name); + void create(const Queue& queue); + void destroy(const Queue& queue); + void recover(QueueRegistry& queues); + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void committed(const string * const xid); + void aborted(const string * const xid); + std::auto_ptr<TransactionContext> begin(); + void commit(TransactionContext* ctxt); + void abort(TransactionContext* ctxt); + ~MessageStoreModule(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp index c5f17c006a..5b7bb1ff5e 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp @@ -23,6 +23,7 @@ #include <qpid/broker/DirectExchange.h> #include <qpid/broker/FanOutExchange.h> #include <qpid/broker/HeadersExchange.h> +#include <qpid/broker/MessageStoreModule.h> #include <qpid/broker/NullMessageStore.h> #include <qpid/broker/SessionHandlerImpl.h> @@ -38,8 +39,9 @@ const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); } -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : - store(new NullMessageStore()), queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) : + store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), + queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) { exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h index 9575656018..3703efcf89 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h @@ -44,7 +44,7 @@ namespace qpid { const u_int32_t timeout;//timeout for auto-deleted queues (in ms) AutoDelete cleaner; public: - SessionHandlerFactoryImpl(u_int32_t timeout = 30000); + SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000); void recover(); virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); virtual ~SessionHandlerFactoryImpl(); |
