diff options
| author | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
| commit | 0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch) | |
| tree | d478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp | |
| parent | 4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff) | |
| download | qpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz | |
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections)
2) Improved handling of federation links:
a) Links can be created even if the remote broker is not reachable
b) If links are lost, re-establishment will occur using an exponential back-off algorithm
3) Durability of exchanges is now viewable through management
4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins.
5) General configuration storage capability has been added to the store/recover interface. This is used for federation links.
6) Management object-ids for durable objects are now themselves durable.
(Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index feb629e118..c6ec573822 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -22,6 +22,7 @@ #include "Message.h" #include "Queue.h" +#include "Link.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" #include "qpid/framing/reply_exceptions.h" @@ -34,9 +35,9 @@ using boost::intrusive_ptr; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -82,6 +83,15 @@ public: void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); }; +class RecoverableConfigImpl : public RecoverableConfig +{ + // TODO: Add links for other config types, consider using super class (PersistableConfig?) + Link::shared_ptr link; +public: + RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} + void setPersistenceId(uint64_t id); +}; + class RecoverableTransactionImpl : public RecoverableTransaction { DtxBuffer::shared_ptr buffer; @@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); } +RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) +{ + string kind; + + buffer.getShortString (kind); + if (kind == "link") + { + return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); + } + + return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead +} + void RecoveryManagerImpl::recoveryComplete() { //TODO (finalise binding setup etc) @@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id) exchange->setPersistenceId(id); } +void RecoverableConfigImpl::setPersistenceId(uint64_t id) +{ + if (link.get()) + link->setPersistenceId(id); + // TODO: add calls to other types. Consider using a parent class. +} + void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) { Queue::shared_ptr queue = queues.find(queueName); |
