From b898761ff46be9bd8de073a4bec260825fdefa47 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 21 Feb 2008 21:23:37 +0000 Subject: Fixes to prevent problems with async store when queue is deleted before all messages are completed or dequeued git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@629999 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/PersistableMessage.cpp | 5 ++++- cpp/src/qpid/broker/PersistableMessage.h | 26 +++++++++++++++----------- cpp/src/qpid/broker/Queue.cpp | 6 ++++-- cpp/src/qpid/broker/Queue.h | 5 +++-- 4 files changed, 26 insertions(+), 16 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index f3ca574503..3bf390faf3 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -38,7 +38,10 @@ void PersistableMessage::flush() } } for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) { - store->flush(*(*i)); + PersistableQueue::shared_ptr q(i->lock()); + if (q) { + store->flush(*q); + } } } diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 883acff497..d5977665fe 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -25,6 +25,7 @@ #include #include #include +#include #include "Persistable.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Monitor.h" @@ -63,12 +64,12 @@ class PersistableMessage : public Persistable */ int asyncDequeueCounter; protected: - typedef std::list syncList; - syncList synclist; - MessageStore* store; - bool contentReleased; - - inline void setContentReleased() {contentReleased = true; } + typedef std::list< boost::weak_ptr > syncList; + syncList synclist; + MessageStore* store; + bool contentReleased; + + inline void setContentReleased() {contentReleased = true; } public: typedef boost::shared_ptr shared_ptr; @@ -118,18 +119,20 @@ public: sys::ScopedLock l(storeLock); if (store) { for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { - (*i)->notifyDurableIOComplete(); + PersistableQueue::shared_ptr q(i->lock()); + if (q) q->notifyDurableIOComplete(); } //synclist.clear(); } } } - inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) { + inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock l(storeLock); store = _store; - synclist.push_back(queue); + boost::weak_ptr q(queue); + synclist.push_back(q); } enqueueAsync(); } @@ -161,11 +164,12 @@ public: } } - inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) { + inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock l(storeLock); store = _store; - synclist.push_back(queue); + boost::weak_ptr q(queue); + synclist.push_back(q); } dequeueAsync(); } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d34ca06364..abe4f3f9a5 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -437,7 +437,7 @@ bool Queue::canAutoDelete() const{ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr msg) { if (msg->isPersistent() && store) { - msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue + msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue intrusive_ptr pmsg = static_pointer_cast(msg); store->enqueue(ctxt, pmsg, *this); return true; @@ -450,7 +450,7 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr msg) bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr msg) { if (msg->isPersistent() && store) { - msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue + msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue intrusive_ptr pmsg = static_pointer_cast(msg); store->dequeue(ctxt, pmsg, *this); return true; @@ -498,7 +498,9 @@ void Queue::destroy() } if (store) { + store->flush(*this); store->destroy(*this); + store = 0;//ensure we make no more calls to the store for this queue } } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 4018f91367..aaae175be8 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" #include "Consumer.h" @@ -55,13 +56,13 @@ namespace qpid { * registered consumers or be stored until dequeued or until one * or more consumers registers. */ - class Queue : public PersistableQueue, public management::Manageable { + class Queue : public boost::enable_shared_from_this, public PersistableQueue, public management::Manageable { typedef std::set Listeners; typedef std::deque Messages; const string name; const bool autodelete; - MessageStore* const store; + MessageStore* store; const ConnectionToken* owner; uint32_t consumerCount; bool exclusive; -- cgit v1.2.1