summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-02-21 21:23:37 +0000
committerGordon Sim <gsim@apache.org>2008-02-21 21:23:37 +0000
commitb898761ff46be9bd8de073a4bec260825fdefa47 (patch)
tree45387c6c4a000cb7056ea0418a1c6eb2974bfbf6 /cpp/src
parent313ac7bf57e62ebc0fd0ef1012617b61fb4a952b (diff)
downloadqpid-python-b898761ff46be9bd8de073a4bec260825fdefa47.tar.gz
Fixes to prevent problems with async store when queue is deleted before all messages are completed or dequeued
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@629999 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp5
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h26
-rw-r--r--cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--cpp/src/qpid/broker/Queue.h5
4 files changed, 26 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index f3ca574503..3bf390faf3 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -38,7 +38,10 @@ void PersistableMessage::flush()
}
}
for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
- store->flush(*(*i));
+ PersistableQueue::shared_ptr q(i->lock());
+ if (q) {
+ store->flush(*q);
+ }
}
}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 883acff497..d5977665fe 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -25,6 +25,7 @@
#include <string>
#include <list>
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Monitor.h"
@@ -63,12 +64,12 @@ class PersistableMessage : public Persistable
*/
int asyncDequeueCounter;
protected:
- typedef std::list<PersistableQueue*> syncList;
- syncList synclist;
- MessageStore* store;
- bool contentReleased;
-
- inline void setContentReleased() {contentReleased = true; }
+ typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
+ syncList synclist;
+ MessageStore* store;
+ bool contentReleased;
+
+ inline void setContentReleased() {contentReleased = true; }
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -118,18 +119,20 @@ public:
sys::ScopedLock<sys::Mutex> l(storeLock);
if (store) {
for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
- (*i)->notifyDurableIOComplete();
+ PersistableQueue::shared_ptr q(i->lock());
+ if (q) q->notifyDurableIOComplete();
}
//synclist.clear();
}
}
}
- inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) {
+ inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
- synclist.push_back(queue);
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
}
enqueueAsync();
}
@@ -161,11 +164,12 @@ public:
}
}
- inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) {
+ inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
- synclist.push_back(queue);
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
}
dequeueAsync();
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index d34ca06364..abe4f3f9a5 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -437,7 +437,7 @@ bool Queue::canAutoDelete() const{
bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
- msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
@@ -450,7 +450,7 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
- msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
+ msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
store->dequeue(ctxt, pmsg, *this);
return true;
@@ -498,7 +498,9 @@ void Queue::destroy()
}
if (store) {
+ store->flush(*this);
store->destroy(*this);
+ store = 0;//ensure we make no more calls to the store for this queue
}
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 4018f91367..aaae175be8 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -26,6 +26,7 @@
#include <deque>
#include <set>
#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
#include "Consumer.h"
@@ -55,13 +56,13 @@ namespace qpid {
* registered consumers or be stored until dequeued or until one
* or more consumers registers.
*/
- class Queue : public PersistableQueue, public management::Manageable {
+ class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable {
typedef std::set<Consumer*> Listeners;
typedef std::deque<QueuedMessage> Messages;
const string name;
const bool autodelete;
- MessageStore* const store;
+ MessageStore* store;
const ConnectionToken* owner;
uint32_t consumerCount;
bool exclusive;