diff options
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index feb629e118..c6ec573822 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -22,6 +22,7 @@ #include "Message.h" #include "Queue.h" +#include "Link.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" #include "qpid/framing/reply_exceptions.h" @@ -34,9 +35,9 @@ using boost::intrusive_ptr; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -82,6 +83,15 @@ public: void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); }; +class RecoverableConfigImpl : public RecoverableConfig +{ + // TODO: Add links for other config types, consider using super class (PersistableConfig?) + Link::shared_ptr link; +public: + RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} + void setPersistenceId(uint64_t id); +}; + class RecoverableTransactionImpl : public RecoverableTransaction { DtxBuffer::shared_ptr buffer; @@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); } +RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) +{ + string kind; + + buffer.getShortString (kind); + if (kind == "link") + { + return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); + } + + return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead +} + void RecoveryManagerImpl::recoveryComplete() { //TODO (finalise binding setup etc) @@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id) exchange->setPersistenceId(id); } +void RecoverableConfigImpl::setPersistenceId(uint64_t id) +{ + if (link.get()) + link->setPersistenceId(id); + // TODO: add calls to other types. Consider using a parent class. +} + void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) { Queue::shared_ptr queue = queues.find(queueName); |
