From 248f1fe188fe2307b9dcf2c87a83b653eaa1920c Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Sat, 26 Dec 2009 12:42:57 +0000 Subject: synchronized with trunk except for ruby dir git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 66 ++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 20 deletions(-) (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp') diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index b058978ccf..12ac2d2bfd 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -18,21 +18,22 @@ * under the License. * */ -#include "RecoveryManagerImpl.h" - -#include "Message.h" -#include "Queue.h" -#include "Link.h" -#include "Bridge.h" -#include "RecoveredEnqueue.h" -#include "RecoveredDequeue.h" +#include "qpid/broker/RecoveryManagerImpl.h" + +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/RecoveredDequeue.h" #include "qpid/framing/reply_exceptions.h" -using namespace qpid; -using namespace qpid::broker; using boost::dynamic_pointer_cast; using boost::intrusive_ptr; +namespace qpid { +namespace broker { + RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} @@ -44,10 +45,10 @@ class RecoverableMessageImpl : public RecoverableMessage intrusive_ptr msg; const uint64_t stagingThreshold; public: - RecoverableMessageImpl(const intrusive_ptr& _msg, uint64_t _stagingThreshold) - : msg(_msg), stagingThreshold(_stagingThreshold) {} + RecoverableMessageImpl(const intrusive_ptr& _msg, uint64_t _stagingThreshold); ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); + void setRedelivered(); bool loadContent(uint64_t available); void decodeContent(framing::Buffer& buffer); void recover(Queue::shared_ptr queue); @@ -59,7 +60,7 @@ class RecoverableQueueImpl : public RecoverableQueue { Queue::shared_ptr queue; public: - RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {} + RecoverableQueueImpl(const boost::shared_ptr& _queue) : queue(_queue) {} ~RecoverableQueueImpl() {}; void setPersistenceId(uint64_t id); uint64_t getPersistenceId() const; @@ -78,7 +79,7 @@ class RecoverableExchangeImpl : public RecoverableExchange public: RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} void setPersistenceId(uint64_t id); - void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); + void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); }; class RecoverableConfigImpl : public RecoverableConfig @@ -102,18 +103,24 @@ public: RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer) { - return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues)); + Exchange::shared_ptr e = Exchange::decode(exchanges, buffer); + if (e) { + return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(e, queues)); + } else { + return RecoverableExchange::shared_ptr(); + } } RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer) { - Queue::shared_ptr queue = Queue::decode(queues, buffer); + Queue::shared_ptr queue = Queue::decode(queues, buffer, true); try { Exchange::shared_ptr exchange = exchanges.getDefault(); if (exchange) { exchange->bind(queue, queue->getName(), 0); + queue->bound(exchange->getName(), queue->getName(), framing::FieldTable()); } - } catch (const framing::NotFoundException& e) { + } catch (const framing::NotFoundException& /*e*/) { //assume no default exchange has been declared } return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue)); @@ -149,7 +156,16 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer void RecoveryManagerImpl::recoveryComplete() { - //TODO (finalise binding setup etc) + //notify all queues and exchanges + queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1, boost::ref(exchanges))); + exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges))); +} + +RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) +{ + if (!msg->isPersistent()) { + msg->forcePersistent(); // set so that message will get dequeued from store. + } } bool RecoverableMessageImpl::loadContent(uint64_t available) @@ -172,6 +188,11 @@ void RecoverableMessageImpl::setPersistenceId(uint64_t id) msg->setPersistenceId(id); } +void RecoverableMessageImpl::setRedelivered() +{ + msg->redeliver(); +} + void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) { dynamic_pointer_cast(msg)->recover(queue); @@ -181,7 +202,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id) { queue->setPersistenceId(id); } - + uint64_t RecoverableQueueImpl::getPersistenceId() const { return queue->getPersistenceId(); @@ -215,10 +236,13 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id) bridge->setPersistenceId(id); } -void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) +void RecoverableExchangeImpl::bind(const string& queueName, + const string& key, + framing::FieldTable& args) { Queue::shared_ptr queue = queues.find(queueName); exchange->bind(queue, key, &args); + queue->bound(exchange->getName(), key, args); } void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) @@ -251,3 +275,5 @@ void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, Rec { dynamic_pointer_cast(queue)->enqueue(buffer, message); } + +}} -- cgit v1.2.1