diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 2 |
5 files changed, 17 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index f0dc159752..2a0aa9ffee 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -311,7 +311,7 @@ void BrokerAdapter::BasicHandlerImpl::consume( if(!nowait) client.consumeOk(newTag, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); + queue->requestDispatch(); } void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 523a834715..9b6bdf5a2b 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -253,7 +253,7 @@ void Channel::ConsumerImpl::cancel(){ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) - queue->dispatch(); + queue->requestDispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg){ diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index b26d1d3ed7..cf6beff375 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -87,6 +87,11 @@ void Queue::requeue(Message::shared_ptr& msg){ } +void Queue::requestDispatch(){ + serializer.execute(boost::bind(&Queue::dispatch, this)); +} + + bool Queue::dispatch(Message::shared_ptr& msg){ diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 415e22f04c..0ed368e404 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -81,7 +81,11 @@ namespace qpid { void push(Message::shared_ptr& msg); bool dispatch(Message::shared_ptr& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); - + /** + * only called by serilizer + */ + void dispatch(); + public: typedef boost::shared_ptr<Queue> shared_ptr; @@ -120,12 +124,12 @@ namespace qpid { */ void recover(Message::shared_ptr& msg); /** - * Dispatch any queued messages providing there are + * Request dispatch any queued messages providing there are * consumers for them. Only one thread can be dispatching - * at any time, but this method (rather than the caller) - * is responsible for ensuring that. + * at any time, so this call schedules the despatch based on + * the serilizer policy. */ - void dispatch(); + void requestDispatch(); void consume(Consumer* c, bool exclusive = false); void cancel(Consumer* c); uint32_t purge(); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index f586ea92fc..252b465cc5 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -134,7 +134,7 @@ MessageHandlerImpl::consume(const MethodContext& context, noLocal ? &connection : 0, &filter); client.ok(context.getRequestId()); // Dispatch messages as there is now a consumer. - queue->dispatch(); + queue->requestDispatch(); } void |
