diff options
| author | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
| commit | 0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch) | |
| tree | d478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp/src/qpid/broker | |
| parent | 4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff) | |
| download | qpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz | |
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections)
2) Improved handling of federation links:
a) Links can be created even if the remote broker is not reachable
b) If links are lost, re-establishment will occur using an exponential back-off algorithm
3) Durability of exchanges is now viewable through management
4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins.
5) General configuration storage capability has been added to the store/recover interface. This is used for federation links.
6) Management object-ids for durable objects are now themselves durable.
(Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
27 files changed, 876 insertions, 182 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 456eba7f9d..a8e7b3c368 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -31,10 +31,12 @@ using qpid::framing::Uuid; namespace qpid { namespace broker { -Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) : - args(_args), channel(id, &(c.getOutput())), peer(channel), - mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)), - connection(c), listener(l), name(Uuid(true).str()) +Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l, + const management::ArgsLinkBridge& _args) : + id(_id), args(_args), + mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest, + args.i_key, args.i_src_is_queue, args.i_src_is_local)), + listener(l), name(Uuid(true).str()) { management::ManagementAgent::getAgent()->addObject(mgmtObject); } @@ -44,18 +46,21 @@ Bridge::~Bridge() mgmtObject->resourceDestroy(); } -void Bridge::create() +void Bridge::create(ConnectionState& c) { - framing::AMQP_ServerProxy::Session session(channel); - session.attach(name, false); + channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); + peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); + + session->attach(name, false); if (args.i_src_is_local) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { if (args.i_src_is_queue) { - peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { string queue = "bridge_queue_"; queue += Uuid(true).str(); @@ -66,22 +71,22 @@ void Bridge::create() if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); } + bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues? bool autoDelete = !durable;//auto delete transient queues? - peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); - peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); - peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); + peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } } - } void Bridge::cancel() { - peer.getMessage().cancel(args.i_dest); - peer.getSession().detach(name); + peer->getMessage().cancel(args.i_dest); + peer->getSession().detach(name); } management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const @@ -94,8 +99,6 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man if (methodId == management::Bridge::METHOD_CLOSE) { //notify that we are closed listener(this); - //request time on the connections io thread - connection.getOutput().activateOutput(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index 943050e244..15efcc6482 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -28,33 +28,36 @@ #include "qpid/management/Bridge.h" #include <boost/function.hpp> +#include <memory> namespace qpid { namespace broker { class ConnectionState; +class Link; class Bridge : public management::Manageable { public: typedef boost::function<void(Bridge*)> CancellationListener; - Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, - const management::ArgsLinkBridge& args); + Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args); ~Bridge(); - void create(); + void create(ConnectionState& c); void cancel(); management::ManagementObject::shared_ptr GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args); private: - management::ArgsLinkBridge args; - framing::ChannelHandler channel; - framing::AMQP_ServerProxy peer; - management::Bridge::shared_ptr mgmtObject; - ConnectionState& connection; + std::auto_ptr<framing::ChannelHandler> channelHandler; + std::auto_ptr<framing::AMQP_ServerProxy::Session> session; + std::auto_ptr<framing::AMQP_ServerProxy> peer; + + framing::ChannelId id; + management::ArgsLinkBridge args; + management::Bridge::shared_ptr mgmtObject; CancellationListener listener; std::string name; }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e9b1db0413..d80c13f12a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,6 +28,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "Link.h" #include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -60,7 +61,7 @@ using qpid::sys::Dispatcher; using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; -using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -129,15 +130,16 @@ Broker::Broker(const Broker::Options& conf) : config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), + links(this), factory(*this), sessionManager(conf.ack) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); - ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval); - managementAgent = ManagementAgent::getAgent (); - managementAgent->setInterval (conf.mgmtPubInterval); + ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval, this); + managementAgent = management::ManagementAgent::getAgent (); + ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); @@ -163,6 +165,7 @@ Broker::Broker(const Broker::Options& conf) : queues.setParent (vhost); exchanges.setParent (vhost); + links.setParent (vhost); } // Early-Initialize plugins @@ -178,11 +181,12 @@ Broker::Broker(const Broker::Options& conf) : queues.setStore (store); dtxManager.setStore (store); + links.setStore (store); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { - RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, conf.stagingThreshold); store->recover(recoverer); } @@ -197,8 +201,9 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); - managementAgent->setExchange (mExchange, dExchange); - dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent + ((ManagementBroker*) managementAgent.get()); } else QPID_LOG(info, "Management not enabled"); @@ -285,7 +290,7 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); - ManagementAgent::shutdown (); + ManagementBroker::shutdown (); delete store; if (config.auth) { #if HAVE_SASL @@ -319,7 +324,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_CONNECT : { management::ArgsBrokerConnect& hp= dynamic_cast<management::ArgsBrokerConnect&>(args); - connect(hp.i_host, hp.i_port); + + if (hp.i_useSsl) + return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED; + + std::pair<Link::shared_ptr, bool> response = + links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable); + if (hp.i_durable && response.second) + store->create(*response.first); + status = Manageable::STATUS_OK; break; } @@ -355,10 +368,11 @@ void Broker::accept() { // TODO: How to chose the protocolFactory to use for the connection void Broker::connect( - const std::string& host, uint16_t port, - sys::ConnectionCodec::Factory* f) + const std::string& host, uint16_t port, bool /*useSsl*/, + sys::ConnectionCodec::Factory* f, + sys::ProtocolAccess* access) { - getProtocolFactory()->connect(poller, host, port, f ? f : &factory); + getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access); } void Broker::connect( @@ -366,7 +380,7 @@ void Broker::connect( { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - connect(addr.host, addr.port, f); + connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index e48f3dc23f..a1eaf4f62f 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -29,11 +29,12 @@ #include "ExchangeRegistry.h" #include "MessageStore.h" #include "QueueRegistry.h" +#include "LinkRegistry.h" #include "SessionManager.h" #include "Vhost.h" #include "System.h" #include "qpid/management/Manageable.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/management/Broker.h" #include "qpid/management/ArgsBrokerConnect.h" #include "qpid/Options.h" @@ -43,6 +44,7 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" +#include "qpid/sys/ProtocolAccess.h" #include <vector> @@ -111,6 +113,7 @@ class Broker : public sys::Runnable, public Plugin::Target, MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } + LinkRegistry& getLinks() { return links; } uint64_t getStagingThreshold() { return config.stagingThreshold; } DtxManager& getDtxManager() { return dtxManager; } DataDir& getDataDir() { return dataDir; } @@ -130,11 +133,16 @@ class Broker : public sys::Runnable, public Plugin::Target, void accept(); /** Create a connection to another broker. */ - void connect(const std::string& host, uint16_t port, - sys::ConnectionCodec::Factory* =0); + void connect(const std::string& host, uint16_t port, bool useSsl, + sys::ConnectionCodec::Factory* =0, + sys::ProtocolAccess* =0); /** Create a connection to another broker. */ void connect(const Url& url, sys::ConnectionCodec::Factory* =0); + // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed + // For the present just return the first ProtocolFactory registered. + boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; + private: boost::shared_ptr<sys::Poller> poller; Options config; @@ -144,6 +152,7 @@ class Broker : public sys::Runnable, public Plugin::Target, QueueRegistry queues; ExchangeRegistry exchanges; + LinkRegistry links; ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; @@ -152,10 +161,6 @@ class Broker : public sys::Runnable, public Plugin::Target, Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; - // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed - // For the present just return the first ProtocolFactory registered. - boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; - void declareStandardExchange(const std::string& name, const std::string& type); }; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 1994c4fdf5..d156b4a914 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -52,37 +52,14 @@ class Connection::MgmtClient : public Connection::MgmtWrapper management::Client::shared_ptr mgmtClient; public: - MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, + const std::string& mgmtId, bool incoming); ~MgmtClient(); void received(framing::AMQFrame& frame); management::ManagementObject::shared_ptr getManagementObject() const; void closing(); }; -class Connection::MgmtLink : public Connection::MgmtWrapper -{ - typedef boost::ptr_vector<Bridge> Bridges; - - management::Link::shared_ptr mgmtLink; - Bridges created;//holds list of bridges pending creation - Bridges cancelled;//holds list of bridges pending cancellation - Bridges active;//holds active bridges - uint channelCounter; - sys::Mutex linkLock; - - void cancel(Bridge*); - -public: - MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); - ~MgmtLink(); - void received(framing::AMQFrame& frame); - management::ManagementObject::shared_ptr getManagementObject() const; - void closing(); - void processPending(); - void process(Connection& connection, const management::Args& args); -}; - - Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) : ConnectionState(out_, broker_), adapter(*this, isLink), @@ -103,14 +80,21 @@ void Connection::initMgmt(bool asLink) if (agent.get () != 0) { if (asLink) { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId)); + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false)); } else { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId)); + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true)); } } } } +void Connection::requestIOProcessing (boost::function0<void> callback) +{ + ioCallback = callback; + out->activateOutput(); +} + + Connection::~Connection () {} void Connection::received(framing::AMQFrame& frame){ @@ -160,8 +144,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions. bool Connection::doOutput() { try{ - //process any pending mgmt commands: - if (mgmtWrapper.get()) mgmtWrapper->processPending(); + if (ioCallback) + ioCallback(); // Lend the IO thread for management processing + ioCallback = 0; if (mgmtClosing) close (403, "Closed by Management Request", 0, 0); //then do other output as needed: @@ -192,8 +177,7 @@ ManagementObject::shared_ptr Connection::GetManagementObject (void) const return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); } -Manageable::status_t Connection::ManagementMethod (uint32_t methodId, - Args& args) +Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -207,93 +191,17 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, out->activateOutput(); status = Manageable::STATUS_OK; break; - case management::Link::METHOD_BRIDGE : - //queue this up and request chance to do output (i.e. get connections thread of control): - mgmtWrapper->process(*this, args); - out->activateOutput(); - status = Manageable::STATUS_OK; - break; } return status; } -Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) - : channelCounter(1) -{ - mgmtLink = management::Link::shared_ptr - (new management::Link(conn, parent, mgmtId)); - agent->addObject (mgmtLink); -} - -Connection::MgmtLink::~MgmtLink() -{ - if (mgmtLink.get () != 0) - mgmtLink->resourceDestroy (); -} - -void Connection::MgmtLink::received(framing::AMQFrame& frame) -{ - if (mgmtLink.get () != 0) - { - mgmtLink->inc_framesFromPeer (); - mgmtLink->inc_bytesFromPeer (frame.size ()); - } -} - -management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const -{ - return dynamic_pointer_cast<ManagementObject>(mgmtLink); -} - -void Connection::MgmtLink::closing() -{ - if (mgmtLink) mgmtLink->set_closing (1); -} - -void Connection::MgmtLink::processPending() -{ - Mutex::ScopedLock l(linkLock); - //process any pending creates - if (!created.empty()) { - for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { - i->create(); - } - active.transfer(active.end(), created.begin(), created.end(), created); - } - if (!cancelled.empty()) { - //process any pending cancellations - for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { - i->cancel(); - } - cancelled.clear(); - } -} - -void Connection::MgmtLink::process(Connection& connection, const management::Args& args) -{ - Mutex::ScopedLock l(linkLock); - created.push_back(new Bridge(channelCounter++, connection, - boost::bind(&MgmtLink::cancel, this, _1), - dynamic_cast<const management::ArgsLinkBridge&>(args))); -} - -void Connection::MgmtLink::cancel(Bridge* b) -{ - Mutex::ScopedLock l(linkLock); - //need to take this out the active map and add it to the cancelled map - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if (&(*i) == b) { - cancelled.transfer(cancelled.end(), i, active); - break; - } - } -} - -Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) +Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, + ManagementAgent::shared_ptr agent, + const std::string& mgmtId, bool incoming) { mgmtClient = management::Client::shared_ptr - (new management::Client (conn, parent, mgmtId)); + (new management::Client (conn, parent, mgmtId, incoming)); agent->addObject (mgmtClient); } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index c8e7fb7079..dff1e0653b 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -54,6 +54,7 @@ class Connection : public sys::ConnectionInputHandler, public ConnectionState { public: + typedef boost::shared_ptr<Connection> shared_ptr; Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); ~Connection (); @@ -78,6 +79,7 @@ class Connection : public sys::ConnectionInputHandler, ManagementMethod (uint32_t methodId, management::Args& args); void initMgmt(bool asLink = false); + void requestIOProcessing (boost::function0<void>); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; @@ -100,7 +102,6 @@ class Connection : public sys::ConnectionInputHandler, virtual void process(Connection&, const management::Args&){} }; class MgmtClient; - class MgmtLink; ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; @@ -108,6 +109,7 @@ class Connection : public sys::ConnectionInputHandler, std::auto_ptr<MgmtWrapper> mgmtWrapper; bool mgmtClosing; const std::string mgmtId; + boost::function0<void> ioCallback; }; }} diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index 5de5a0230a..cd015ce4f5 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std: } sys::ConnectionCodec* -ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { +ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) { // used to create connections from one broker to another - return new amqp_0_10::Connection(out, broker, id, true); + return new amqp_0_10::Connection(out, broker, id, true, a); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h index 5797495054..bf55ab3b88 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.h +++ b/cpp/src/qpid/broker/ConnectionFactory.h @@ -24,6 +24,7 @@ #include "qpid/sys/ConnectionCodec.h" namespace qpid { +namespace sys { class ProtocolAccess; } namespace broker { class Broker; @@ -37,7 +38,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory { create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* - create(sys::OutputControl&, const std::string& id); + create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0); private: Broker& broker; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 4ed2f5bfa2..162664fb88 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -35,8 +35,9 @@ using namespace qpid::framing; namespace { -const std::string PLAIN = "PLAIN"; -const std::string en_US = "en_US"; +const std::string ANONYMOUS = "ANONYMOUS"; +const std::string PLAIN = "PLAIN"; +const std::string en_US = "en_US"; } void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId) @@ -135,10 +136,8 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/, const framing::Array& /*mechanisms*/, const framing::Array& /*locales*/) { - string uid = "qpidd"; - string pwd = "qpidd"; - string response = ((char)0) + uid + ((char)0) + pwd; - server.startOk(FieldTable(), PLAIN, response, en_US); + string response; + server.startOk(FieldTable(), ANONYMOUS, response, en_US); connection.initMgmt(true); } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 47d616cf16..0d9ffb7122 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name)); + (new management::Exchange (this, parent, _name, durable)); agent->addObject (mgmtExchange); } } @@ -56,8 +56,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name)); - agent->addObject (mgmtExchange); + (new management::Exchange (this, parent, _name, durable)); + if (!durable) + agent->addObject (mgmtExchange); } } } @@ -68,6 +69,16 @@ Exchange::~Exchange () mgmtExchange->resourceDestroy (); } +void Exchange::setPersistenceId(uint64_t id) const +{ + if (mgmtExchange != 0 && persistenceId == 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtExchange, id, 2); + } + persistenceId = id; +} + Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) { string name; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 7902eb4219..9b18129857 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -90,7 +90,7 @@ namespace qpid { virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; //PersistableExchange: - void setPersistenceId(uint64_t id) const { persistenceId = id; } + void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; void encode(framing::Buffer& buffer) const; diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp new file mode 100644 index 0000000000..83c9a2a62e --- /dev/null +++ b/cpp/src/qpid/broker/Link.cpp @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Link.h" +#include "LinkRegistry.h" +#include "Broker.h" +#include "Connection.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/management/Link.h" +#include "boost/bind.hpp" +#include "qpid/log/Statement.h" + +using namespace qpid::broker; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::Mutex; + +Link::Link(LinkRegistry* _links, + string& _host, + uint16_t _port, + bool _useSsl, + bool _durable, + Broker* _broker, + management::Manageable* parent) + : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + persistenceId(0), broker(_broker), state(0), + access(boost::bind(&Link::established, this), + boost::bind(&Link::closed, this, _1, _2), + boost::bind(&Link::setConnection, this, _1)), + visitCount(0), + currentInterval(1), + closing(false), + channelCounter(1) +{ + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); + if (agent.get() != 0) + { + mgmtObject = management::Link::shared_ptr + (new management::Link(this, parent, _host, _port, _useSsl, _durable)); + if (!durable) + agent->addObject(mgmtObject); + } + } + setState(STATE_WAITING); +} + +Link::~Link () +{ + if (state == STATE_OPERATIONAL) + access.close(); + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +void Link::setState (int newState) +{ + if (newState == state) + return; + + state = newState; + if (mgmtObject.get() == 0) + return; + + switch (state) + { + case STATE_WAITING : mgmtObject->set_state("Waiting"); break; + case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break; + case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; + } +} + +void Link::startConnection () +{ + try { + broker->connect (host, port, useSsl, 0, &access); + setState(STATE_CONNECTING); + } catch(std::exception& e) { + setState(STATE_WAITING); + mgmtObject->set_lastError (e.what()); + } +} + +void Link::established () +{ + Mutex::ScopedLock mutex(lock); + + QPID_LOG (info, "Inter-broker link established to " << host << ":" << port); + setState(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + if (closing) + destroy(); +} + +void Link::closed (int, std::string text) +{ + Mutex::ScopedLock mutex(lock); + + if (state == STATE_OPERATIONAL) + QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); + + connection.reset(); + created.transfer(created.end(), active.begin(), active.end(), active); + setState(STATE_WAITING); + mgmtObject->set_lastError (text); + if (closing) + destroy(); +} + +void Link::destroy () +{ + QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); + connection.reset(); + links->destroy (host, port); +} + +void Link::cancel(Bridge* bridge) +{ + Mutex::ScopedLock mutex(lock); + + //need to take this out of the active map and add it to the cancelled map + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if (&(*i) == bridge) { + cancelled.transfer(cancelled.end(), i, active); + break; + } + } + + if (connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::ioThreadProcessing() +{ + Mutex::ScopedLock mutex(lock); + + //process any pending creates + if (!created.empty()) { + for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { + i->create(*connection); + } + active.transfer(active.end(), created.begin(), created.end(), created); + } + if (!cancelled.empty()) { + //process any pending cancellations + for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { + i->cancel(); + } + cancelled.clear(); + } +} + +void Link::setConnection(Connection::shared_ptr c) +{ + connection = c; + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::maintenanceVisit () +{ + Mutex::ScopedLock mutex(lock); + + if (state == STATE_WAITING) + { + visitCount++; + if (visitCount >= currentInterval) + { + visitCount = 0; + currentInterval *= 2; + if (currentInterval > MAX_INTERVAL) + currentInterval = MAX_INTERVAL; + startConnection(); + } + } +} + +void Link::setPersistenceId(uint64_t id) const +{ + if (mgmtObject != 0 && persistenceId == 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject, id); + } + persistenceId = id; +} + +const string& Link::getName() const +{ + return host; +} + +Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) +{ + string host; + uint16_t port; + + buffer.getShortString(host); + port = buffer.getShort(); + bool useSsl(buffer.getOctet()); + bool durable(buffer.getOctet()); + + return links.declare(host, port, useSsl, durable).first; +} + +void Link::encode(Buffer& buffer) const +{ + buffer.putShortString(string("link")); + buffer.putShortString(host); + buffer.putShort(port); + buffer.putOctet(useSsl ? 1 : 0); + buffer.putOctet(durable ? 1 : 0); +} + +uint32_t Link::encodedSize() const +{ + return host.size() + 1 // short-string (host) + + 5 // short-string ("link") + + 2 // port + + 1 // useSsl + + 1; // durable +} + +ManagementObject::shared_ptr Link::GetManagementObject (void) const +{ + return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) +{ + Mutex::ScopedLock mutex(lock); + + switch (op) + { + case management::Link::METHOD_CLOSE : + closing = true; + if (state != STATE_CONNECTING) + destroy(); + return Manageable::STATUS_OK; + + case management::Link::METHOD_BRIDGE : + management::ArgsLinkBridge iargs = + dynamic_cast<const management::ArgsLinkBridge&>(args); + + // Durable bridges are only valid on durable links + if (iargs.i_durable && !durable) + return Manageable::STATUS_INVALID_PARAMETER; + + created.push_back(new Bridge(this, channelCounter++, + boost::bind(&Link::cancel, this, _1), iargs)); + + if (state == STATE_OPERATIONAL && connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + return Manageable::STATUS_OK; + } + + return Manageable::STATUS_UNKNOWN_METHOD; +} diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h new file mode 100644 index 0000000000..838c3bf696 --- /dev/null +++ b/cpp/src/qpid/broker/Link.h @@ -0,0 +1,115 @@ +#ifndef _broker_Link_h +#define _broker_Link_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> +#include "MessageStore.h" +#include "PersistableConfig.h" +#include "Bridge.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/ProtocolAccess.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Link.h" +#include <boost/ptr_container/ptr_vector.hpp> + +namespace qpid { + namespace broker { + + using std::string; + class LinkRegistry; + class Broker; + class Connection; + + class Link : public PersistableConfig, public management::Manageable { + private: + sys::Mutex lock; + LinkRegistry* links; + const string host; + const uint16_t port; + const bool useSsl; + const bool durable; + mutable uint64_t persistenceId; + management::Link::shared_ptr mgmtObject; + Broker* broker; + int state; + sys::ProtocolAccess access; + uint32_t visitCount; + uint32_t currentInterval; + bool closing; + + typedef boost::ptr_vector<Bridge> Bridges; + Bridges created; // Bridges pending creation + Bridges active; // Bridges active + Bridges cancelled; // Bridges pending deletion + uint channelCounter; + boost::shared_ptr<Connection> connection; + + static const int STATE_WAITING = 1; + static const int STATE_CONNECTING = 2; + static const int STATE_OPERATIONAL = 3; + + static const uint32_t MAX_INTERVAL = 16; + + void setState (int newState); + void startConnection(); // Start the IO Connection + void established(); // Called when connection is created + void closed(int, std::string); // Called when connection goes away + void destroy(); // Called when mgmt deletes this link + void cancel(Bridge*); // Called by self-cancelling bridge + void ioThreadProcessing(); // Called on connection's IO thread by request + void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection + + public: + typedef boost::shared_ptr<Link> shared_ptr; + + Link(LinkRegistry* links, + string& host, + uint16_t port, + bool useSsl, + bool durable, + Broker* broker, + management::Manageable* parent = 0); + virtual ~Link(); + + bool isDurable() { return durable; } + void maintenanceVisit (); + + // PersistableConfig: + void setPersistenceId(uint64_t id) const; + uint64_t getPersistenceId() const { return persistenceId; } + uint32_t encodedSize() const; + void encode(framing::Buffer& buffer) const; + const string& getName() const; + + static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t ManagementMethod (uint32_t, management::Args&); + }; + } +} + + +#endif /*!_broker_Link.cpp_h*/ diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp new file mode 100644 index 0000000000..6e20a3f7ce --- /dev/null +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LinkRegistry.h" +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::sys; +using std::pair; +using std::stringstream; +using boost::intrusive_ptr; + +#define LINK_MAINT_INTERVAL 5 + +LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0) +{ + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); +} + +LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : + TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC)), links(_links) {} + +void LinkRegistry::Periodic::fire () +{ + links.periodicMaintenance (); + links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links))); +} + +void LinkRegistry::periodicMaintenance () +{ + Mutex::ScopedLock locker(lock); + linksToDestroy.clear(); + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) + i->second->maintenanceVisit(); +} + +pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host, + uint16_t port, + bool useSsl, + bool durable) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string key = string(keystream.str()); + + LinkMap::iterator i = links.find(key); + if (i == links.end()) + { + Link::shared_ptr link; + + link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent)); + links[key] = link; + return std::pair<Link::shared_ptr, bool>(link, true); + } + return std::pair<Link::shared_ptr, bool>(i->second, false); +} + +void LinkRegistry::destroy(const string& host, const uint16_t port) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string key = string(keystream.str()); + + LinkMap::iterator i = links.find(key); + if (i != links.end()) + { + if (i->second->isDurable() && store) + store->destroy(*(i->second)); + linksToDestroy[key] = i->second; + links.erase(i); + } +} + +void LinkRegistry::setStore (MessageStore* _store) +{ + assert (store == 0 && _store != 0); + store = _store; +} + +MessageStore* LinkRegistry::getStore() const { + return store; +} + diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h new file mode 100644 index 0000000000..86d8c3d2f9 --- /dev/null +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -0,0 +1,87 @@ +#ifndef _broker_LinkRegistry_h +#define _broker_LinkRegistry_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <map> +#include "Link.h" +#include "MessageStore.h" +#include "Timer.h" +#include "qpid/sys/Mutex.h" +#include "qpid/management/Manageable.h" + +namespace qpid { +namespace broker { + + class Broker; + class LinkRegistry { + + // Declare a timer task to manage the establishment of link connections and the + // re-establishment of lost link connections. + struct Periodic : public TimerTask + { + LinkRegistry& links; + + Periodic(LinkRegistry& links); + virtual ~Periodic() {}; + void fire(); + }; + + typedef std::map<std::string, Link::shared_ptr> LinkMap; + LinkMap links; + LinkMap linksToDestroy; + qpid::sys::Mutex lock; + Broker* broker; + Timer timer; + management::Manageable* parent; + MessageStore* store; + + void periodicMaintenance (); + + public: + LinkRegistry (Broker* _broker); + std::pair<Link::shared_ptr, bool> declare(std::string& host, + uint16_t port, + bool useSsl, + bool durable); + void destroy(const std::string& host, const uint16_t port); + + /** + * Register the manageable parent for declared queues + */ + void setParent (management::Manageable* _parent) { parent = _parent; } + + /** + * Set the store to use. May only be called once. + */ + void setStore (MessageStore*); + + /** + * Return the message store used. + */ + MessageStore* getStore() const; + }; +} +} + + +#endif /*!_broker_LinkRegistry_h*/ diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 76469ccc50..17fd6aefb8 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -24,6 +24,7 @@ #include "PersistableExchange.h" #include "PersistableMessage.h" #include "PersistableQueue.h" +#include "PersistableConfig.h" #include "RecoveryManager.h" #include "TransactionalStore.h" #include "qpid/framing/FieldTable.h" @@ -87,6 +88,16 @@ public: const std::string& key, const framing::FieldTable& args) = 0; /** + * Record generic durable configuration + */ + virtual void create(const PersistableConfig& config) = 0; + + /** + * Destroy generic durable configuration + */ + virtual void destroy(const PersistableConfig& config) = 0; + + /** * Stores a messages before it has been enqueued * (enqueueing automatically stores the message so this is * only required if storage is required prior to that diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index e02c87f069..2544d5d533 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -70,6 +70,16 @@ void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQ TRANSFER_EXCEPTION(store->unbind(e, q, k, a)); } +void MessageStoreModule::create(const PersistableConfig& config) +{ + TRANSFER_EXCEPTION(store->create(config)); +} + +void MessageStoreModule::destroy(const PersistableConfig& config) +{ + TRANSFER_EXCEPTION(store->destroy(config)); +} + void MessageStoreModule::recover(RecoveryManager& registry) { TRANSFER_EXCEPTION(store->recover(registry)); diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index c7ad76d8bb..f4d05e3e0d 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -57,6 +57,8 @@ public: const std::string& key, const framing::FieldTable& args); void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); + void create(const PersistableConfig& config); + void destroy(const PersistableConfig& config); void recover(RecoveryManager& queues); void stage(boost::intrusive_ptr<PersistableMessage>& msg); void destroy(PersistableMessage& msg); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 8936b0440f..401c76f5a2 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -49,7 +49,7 @@ public: using namespace qpid::broker; -NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} +NullMessageStore::NullMessageStore(bool _warn) : warn(_warn), nextPersistenceId(1) {} bool NullMessageStore::init(const Options* /*options*/) {return true;} @@ -57,6 +57,7 @@ void NullMessageStore::create(PersistableQueue& queue, const framing::FieldTable { QPID_LOG(info, "Queue '" << queue.getName() << "' will not be durable. Persistence not enabled."); + queue.setPersistenceId(nextPersistenceId++); } void NullMessageStore::destroy(PersistableQueue&) @@ -67,6 +68,7 @@ void NullMessageStore::create(const PersistableExchange& exchange, const framing { QPID_LOG(info, "Exchange'" << exchange.getName() << "' will not be durable. Persistence not enabled."); + exchange.setPersistenceId(nextPersistenceId++); } void NullMessageStore::destroy(const PersistableExchange& ) @@ -76,6 +78,17 @@ void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&, void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){} +void NullMessageStore::create(const PersistableConfig& config) +{ + QPID_LOG(info, "Persistence not enabled, configuration not stored."); + config.setPersistenceId(nextPersistenceId++); +} + +void NullMessageStore::destroy(const PersistableConfig&) +{ + QPID_LOG(info, "Persistence not enabled, configuration not stored."); +} + void NullMessageStore::recover(RecoveryManager&) { QPID_LOG(info, "Persistence not enabled, no recovery attempted."); diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 96d1c483a2..f06e749ebb 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -37,6 +37,7 @@ class NullMessageStore : public MessageStore { std::set<std::string> prepared; const bool warn; + uint64_t nextPersistenceId; public: NullMessageStore(bool warn = false); @@ -57,6 +58,8 @@ public: const std::string& key, const framing::FieldTable& args); virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); + virtual void create(const PersistableConfig& config); + virtual void destroy(const PersistableConfig& config); virtual void recover(RecoveryManager& queues); virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg); virtual void destroy(PersistableMessage& msg); diff --git a/cpp/src/qpid/broker/PersistableConfig.h b/cpp/src/qpid/broker/PersistableConfig.h new file mode 100644 index 0000000000..914e91ea80 --- /dev/null +++ b/cpp/src/qpid/broker/PersistableConfig.h @@ -0,0 +1,45 @@ +#ifndef _broker_PersistableConfig_h +#define _broker_PersistableConfig_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "Persistable.h" + +namespace qpid { +namespace broker { + +/** + * The interface used by general-purpose persistable configuration for + * the message store. + */ +class PersistableConfig : public Persistable +{ +public: + virtual const std::string& getName() const = 0; + virtual ~PersistableConfig() {}; +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index f7bad8ebc6..355ebdd81e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -586,7 +586,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const if (mgmtObject != 0 && persistenceId == 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - agent->addObject (mgmtObject, _persistenceId); + agent->addObject (mgmtObject, _persistenceId, 3); } persistenceId = _persistenceId; } diff --git a/cpp/src/qpid/broker/RecoverableConfig.h b/cpp/src/qpid/broker/RecoverableConfig.h new file mode 100644 index 0000000000..838a8582dc --- /dev/null +++ b/cpp/src/qpid/broker/RecoverableConfig.h @@ -0,0 +1,45 @@ +#ifndef _broker_RecoverableConfig_h +#define _broker_RecoverableConfig_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +/** + * The interface through which configurations are recovered. + */ +class RecoverableConfig +{ +public: + typedef boost::shared_ptr<RecoverableConfig> shared_ptr; + + virtual void setPersistenceId(uint64_t id) = 0; + virtual ~RecoverableConfig() {}; +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h index bf1813a093..7dcbe3a2b0 100644 --- a/cpp/src/qpid/broker/RecoveryManager.h +++ b/cpp/src/qpid/broker/RecoveryManager.h @@ -25,6 +25,7 @@ #include "RecoverableQueue.h" #include "RecoverableMessage.h" #include "RecoverableTransaction.h" +#include "RecoverableConfig.h" #include "TransactionalStore.h" #include "qpid/framing/Buffer.h" @@ -39,6 +40,8 @@ class RecoveryManager{ virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0; virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) = 0; + virtual RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer) = 0; + virtual void recoveryComplete() = 0; }; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index feb629e118..c6ec573822 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -22,6 +22,7 @@ #include "Message.h" #include "Queue.h" +#include "Link.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" #include "qpid/framing/reply_exceptions.h" @@ -34,9 +35,9 @@ using boost::intrusive_ptr; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -82,6 +83,15 @@ public: void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); }; +class RecoverableConfigImpl : public RecoverableConfig +{ + // TODO: Add links for other config types, consider using super class (PersistableConfig?) + Link::shared_ptr link; +public: + RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} + void setPersistenceId(uint64_t id); +}; + class RecoverableTransactionImpl : public RecoverableTransaction { DtxBuffer::shared_ptr buffer; @@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); } +RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) +{ + string kind; + + buffer.getShortString (kind); + if (kind == "link") + { + return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); + } + + return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead +} + void RecoveryManagerImpl::recoveryComplete() { //TODO (finalise binding setup etc) @@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id) exchange->setPersistenceId(id); } +void RecoverableConfigImpl::setPersistenceId(uint64_t id) +{ + if (link.get()) + link->setPersistenceId(id); + // TODO: add calls to other types. Consider using a parent class. +} + void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) { Queue::shared_ptr queue = queues.find(queueName); diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h index 58ec63926c..cd34d464f5 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -25,6 +25,7 @@ #include "DtxManager.h" #include "ExchangeRegistry.h" #include "QueueRegistry.h" +#include "LinkRegistry.h" #include "RecoveryManager.h" namespace qpid { @@ -33,10 +34,12 @@ namespace broker { class RecoveryManagerImpl : public RecoveryManager{ QueueRegistry& queues; ExchangeRegistry& exchanges; + LinkRegistry& links; DtxManager& dtxMgr; const uint64_t stagingThreshold; public: - RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold); + RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links, + DtxManager& dtxMgr, uint64_t stagingThreshold); ~RecoveryManagerImpl(); RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); @@ -44,6 +47,7 @@ namespace broker { RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer); RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn); + RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer); void recoveryComplete(); }; diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h index 0d63bd1b3d..65086abec0 100644 --- a/cpp/src/qpid/broker/System.h +++ b/cpp/src/qpid/broker/System.h @@ -42,9 +42,6 @@ class System : public management::Manageable management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } - - management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) - { return management::Manageable::STATUS_OK; } }; }} |
