summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
committerRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
commit248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch)
treed5d0959a70218946ff72e107a6c106e32479a398 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp
parent3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff)
downloadqpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp66
1 files changed, 46 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index b058978ccf..12ac2d2bfd 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -18,21 +18,22 @@
* under the License.
*
*/
-#include "RecoveryManagerImpl.h"
-
-#include "Message.h"
-#include "Queue.h"
-#include "Link.h"
-#include "Bridge.h"
-#include "RecoveredEnqueue.h"
-#include "RecoveredDequeue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/framing/reply_exceptions.h"
-using namespace qpid;
-using namespace qpid::broker;
using boost::dynamic_pointer_cast;
using boost::intrusive_ptr;
+namespace qpid {
+namespace broker {
+
RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
DtxManager& _dtxMgr, uint64_t _stagingThreshold)
: queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
@@ -44,10 +45,10 @@ class RecoverableMessageImpl : public RecoverableMessage
intrusive_ptr<Message> msg;
const uint64_t stagingThreshold;
public:
- RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold)
- : msg(_msg), stagingThreshold(_stagingThreshold) {}
+ RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold);
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
+ void setRedelivered();
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(Queue::shared_ptr queue);
@@ -59,7 +60,7 @@ class RecoverableQueueImpl : public RecoverableQueue
{
Queue::shared_ptr queue;
public:
- RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
+ RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
~RecoverableQueueImpl() {};
void setPersistenceId(uint64_t id);
uint64_t getPersistenceId() const;
@@ -78,7 +79,7 @@ class RecoverableExchangeImpl : public RecoverableExchange
public:
RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
void setPersistenceId(uint64_t id);
- void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
+ void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
};
class RecoverableConfigImpl : public RecoverableConfig
@@ -102,18 +103,24 @@ public:
RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
{
- return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues));
+ Exchange::shared_ptr e = Exchange::decode(exchanges, buffer);
+ if (e) {
+ return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(e, queues));
+ } else {
+ return RecoverableExchange::shared_ptr();
+ }
}
RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
{
- Queue::shared_ptr queue = Queue::decode(queues, buffer);
+ Queue::shared_ptr queue = Queue::decode(queues, buffer, true);
try {
Exchange::shared_ptr exchange = exchanges.getDefault();
if (exchange) {
exchange->bind(queue, queue->getName(), 0);
+ queue->bound(exchange->getName(), queue->getName(), framing::FieldTable());
}
- } catch (const framing::NotFoundException& e) {
+ } catch (const framing::NotFoundException& /*e*/) {
//assume no default exchange has been declared
}
return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue));
@@ -149,7 +156,16 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer
void RecoveryManagerImpl::recoveryComplete()
{
- //TODO (finalise binding setup etc)
+ //notify all queues and exchanges
+ queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1, boost::ref(exchanges)));
+ exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
+}
+
+RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold)
+{
+ if (!msg->isPersistent()) {
+ msg->forcePersistent(); // set so that message will get dequeued from store.
+ }
}
bool RecoverableMessageImpl::loadContent(uint64_t available)
@@ -172,6 +188,11 @@ void RecoverableMessageImpl::setPersistenceId(uint64_t id)
msg->setPersistenceId(id);
}
+void RecoverableMessageImpl::setRedelivered()
+{
+ msg->redeliver();
+}
+
void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
{
dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
@@ -181,7 +202,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id)
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
@@ -215,10 +236,13 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id)
bridge->setPersistenceId(id);
}
-void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
+void RecoverableExchangeImpl::bind(const string& queueName,
+ const string& key,
+ framing::FieldTable& args)
{
Queue::shared_ptr queue = queues.find(queueName);
exchange->bind(queue, key, &args);
+ queue->bound(exchange->getName(), key, args);
}
void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
@@ -251,3 +275,5 @@ void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, Rec
{
dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message);
}
+
+}}