From 0655ff5aceb9d53eb256a05d7beb55b1c803c8de Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 12 May 2008 17:04:07 +0000 Subject: QPID-1050: Patch from Ted Ross: 1) Durability for federation links (broker-to-broker connections) 2) Improved handling of federation links: a) Links can be created even if the remote broker is not reachable b) If links are lost, re-establishment will occur using an exponential back-off algorithm 3) Durability of exchanges is now viewable through management 4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins. 5) General configuration storage capability has been added to the store/recover interface. This is used for federation links. 6) Management object-ids for durable objects are now themselves durable. (Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 34 +++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp') diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index feb629e118..c6ec573822 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -22,6 +22,7 @@ #include "Message.h" #include "Queue.h" +#include "Link.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" #include "qpid/framing/reply_exceptions.h" @@ -34,9 +35,9 @@ using boost::intrusive_ptr; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -82,6 +83,15 @@ public: void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); }; +class RecoverableConfigImpl : public RecoverableConfig +{ + // TODO: Add links for other config types, consider using super class (PersistableConfig?) + Link::shared_ptr link; +public: + RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} + void setPersistenceId(uint64_t id); +}; + class RecoverableTransactionImpl : public RecoverableTransaction { DtxBuffer::shared_ptr buffer; @@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); } +RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) +{ + string kind; + + buffer.getShortString (kind); + if (kind == "link") + { + return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); + } + + return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead +} + void RecoveryManagerImpl::recoveryComplete() { //TODO (finalise binding setup etc) @@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id) exchange->setPersistenceId(id); } +void RecoverableConfigImpl::setPersistenceId(uint64_t id) +{ + if (link.get()) + link->setPersistenceId(id); + // TODO: add calls to other types. Consider using a parent class. +} + void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) { Queue::shared_ptr queue = queues.find(queueName); -- cgit v1.2.1