diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 40 |
1 files changed, 19 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index f3e1639ca5..901b3ad15a 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -25,6 +25,8 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/Protocol.h" +#include "qpid/broker/RecoverableMessageImpl.h" #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -38,26 +40,11 @@ namespace qpid { namespace broker { RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr) - : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {} + DtxManager& _dtxMgr, ProtocolRegistry& p) + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} -class RecoverableMessageImpl : public RecoverableMessage -{ - Message msg; -public: - RecoverableMessageImpl(const Message& _msg); - ~RecoverableMessageImpl() {}; - void setPersistenceId(uint64_t id); - void setRedelivered(); - bool loadContent(uint64_t available); - void decodeContent(framing::Buffer& buffer); - void recover(Queue::shared_ptr queue); - void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); - void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); -}; - class RecoverableQueueImpl : public RecoverableQueue { Queue::shared_ptr queue; @@ -82,6 +69,7 @@ public: RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} void setPersistenceId(uint64_t id); void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args, AsyncStore* const store); + string getName() const { return exchange->getName(); } }; class RecoverableConfigImpl : public RecoverableConfig @@ -130,10 +118,15 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - //TODO: determine encoding/version actually used - boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); - transfer->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); + framing::Buffer sniffer(buffer.getPointer(), buffer.available()); + RecoverableMessage::shared_ptr m = protocols.recover(sniffer); + if (m) { + return m; + } else { + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + transfer->decodeHeader(buffer); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); + } } RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, @@ -193,6 +186,11 @@ void RecoverableMessageImpl::setRedelivered() msg.deliver();//increment delivery count (but at present that isn't recorded durably) } +void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep) +{ + msg.computeExpiration(ep); +} + void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) { dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue); |