summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp34
1 files changed, 32 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index feb629e118..c6ec573822 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -22,6 +22,7 @@
#include "Message.h"
#include "Queue.h"
+#include "Link.h"
#include "RecoveredEnqueue.h"
#include "RecoveredDequeue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -34,9 +35,9 @@ using boost::intrusive_ptr;
static const uint8_t BASIC = 1;
static const uint8_t MESSAGE = 2;
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges,
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
DtxManager& _dtxMgr, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -82,6 +83,15 @@ public:
void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
};
+class RecoverableConfigImpl : public RecoverableConfig
+{
+ // TODO: Add links for other config types, consider using super class (PersistableConfig?)
+ Link::shared_ptr link;
+public:
+ RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+ void setPersistenceId(uint64_t id);
+};
+
class RecoverableTransactionImpl : public RecoverableTransaction
{
DtxBuffer::shared_ptr buffer;
@@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const
return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
}
+RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
+{
+ string kind;
+
+ buffer.getShortString (kind);
+ if (kind == "link")
+ {
+ return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
+ }
+
+ return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
+}
+
void RecoveryManagerImpl::recoveryComplete()
{
//TODO (finalise binding setup etc)
@@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
exchange->setPersistenceId(id);
}
+void RecoverableConfigImpl::setPersistenceId(uint64_t id)
+{
+ if (link.get())
+ link->setPersistenceId(id);
+ // TODO: add calls to other types. Consider using a parent class.
+}
+
void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
{
Queue::shared_ptr queue = queues.find(queueName);