diff options
| author | Gordon Sim <gsim@apache.org> | 2008-02-21 21:23:37 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-02-21 21:23:37 +0000 |
| commit | b898761ff46be9bd8de073a4bec260825fdefa47 (patch) | |
| tree | 45387c6c4a000cb7056ea0418a1c6eb2974bfbf6 /cpp/src | |
| parent | 313ac7bf57e62ebc0fd0ef1012617b61fb4a952b (diff) | |
| download | qpid-python-b898761ff46be9bd8de073a4bec260825fdefa47.tar.gz | |
Fixes to prevent problems with async store when queue is deleted before all messages are completed or dequeued
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@629999 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 26 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 5 |
4 files changed, 26 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index f3ca574503..3bf390faf3 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -38,7 +38,10 @@ void PersistableMessage::flush() } } for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) { - store->flush(*(*i)); + PersistableQueue::shared_ptr q(i->lock()); + if (q) { + store->flush(*q); + } } } diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 883acff497..d5977665fe 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -25,6 +25,7 @@ #include <string> #include <list> #include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> #include "Persistable.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Monitor.h" @@ -63,12 +64,12 @@ class PersistableMessage : public Persistable */ int asyncDequeueCounter; protected: - typedef std::list<PersistableQueue*> syncList; - syncList synclist; - MessageStore* store; - bool contentReleased; - - inline void setContentReleased() {contentReleased = true; } + typedef std::list< boost::weak_ptr<PersistableQueue> > syncList; + syncList synclist; + MessageStore* store; + bool contentReleased; + + inline void setContentReleased() {contentReleased = true; } public: typedef boost::shared_ptr<PersistableMessage> shared_ptr; @@ -118,18 +119,20 @@ public: sys::ScopedLock<sys::Mutex> l(storeLock); if (store) { for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { - (*i)->notifyDurableIOComplete(); + PersistableQueue::shared_ptr q(i->lock()); + if (q) q->notifyDurableIOComplete(); } //synclist.clear(); } } } - inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) { + inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; - synclist.push_back(queue); + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); } enqueueAsync(); } @@ -161,11 +164,12 @@ public: } } - inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) { + inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; - synclist.push_back(queue); + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); } dequeueAsync(); } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d34ca06364..abe4f3f9a5 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -437,7 +437,7 @@ bool Queue::canAutoDelete() const{ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { - msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue + msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; @@ -450,7 +450,7 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { - msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue + msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); store->dequeue(ctxt, pmsg, *this); return true; @@ -498,7 +498,9 @@ void Queue::destroy() } if (store) { + store->flush(*this); store->destroy(*this); + store = 0;//ensure we make no more calls to the store for this queue } } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 4018f91367..aaae175be8 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -26,6 +26,7 @@ #include <deque> #include <set> #include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" #include "Consumer.h" @@ -55,13 +56,13 @@ namespace qpid { * registered consumers or be stored until dequeued or until one * or more consumers registers. */ - class Queue : public PersistableQueue, public management::Manageable { + class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { typedef std::set<Consumer*> Listeners; typedef std::deque<QueuedMessage> Messages; const string name; const bool autodelete; - MessageStore* const store; + MessageStore* store; const ConnectionToken* owner; uint32_t consumerCount; bool exclusive; |
