diff options
| author | Gordon Sim <gsim@apache.org> | 2007-11-19 12:18:26 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-11-19 12:18:26 +0000 |
| commit | 73eee018d301031a212fe3c8a8127b84c2b580ac (patch) | |
| tree | 8f6361112c086c11613601fa699c910b5b5e8e8d /cpp/src/qpid/broker/Queue.cpp | |
| parent | 2c3e3bf4c62267ac6a0fe1f5d6a6288a927ace0b (diff) | |
| download | qpid-python-73eee018d301031a212fe3c8a8127b84c2b580ac.tar.gz | |
Fixes causing lost 'events' in queue dispatch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@596277 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 20 |
1 files changed, 6 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 757f0aa62d..41a5767457 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -46,8 +46,7 @@ Queue::Queue(const string& _name, bool _autodelete, const ConnectionToken* const _owner, Manageable* parent) : - dispatching(false), - name(_name), + name(_name), autodelete(_autodelete), store(_store), owner(_owner), @@ -76,8 +75,7 @@ void Queue::notifyDurableIOComplete() { // signal SemanticHander to ack completed dequeues // then dispatch to ack... - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } @@ -102,8 +100,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } } @@ -130,8 +127,7 @@ void Queue::process(intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject != 0) mgmtObject->enqueue (msg->contentSize (), mask); - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } @@ -141,8 +137,7 @@ void Queue::requeue(const QueuedMessage& msg){ msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); } - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } bool Queue::acquire(const QueuedMessage& msg) { @@ -158,8 +153,7 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::requestDispatch(Consumer::ptr c){ if (!c || c->preAcquires()) { - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } else { DispatchFunctor f(*this, c); serializer.execute(f); @@ -235,7 +229,6 @@ bool Queue::getNextMessage(QueuedMessage& msg) void Queue::dispatch() { - dispatching = true; QueuedMessage msg(this); while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ if (dispatch(msg)) { @@ -249,7 +242,6 @@ void Queue::dispatch() } } serviceAllBrowsers(); - dispatching = false; } void Queue::serviceAllBrowsers() |
