diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 24 |
1 files changed, 19 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e02444642b..6e859e67bb 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -48,20 +48,32 @@ namespace qpid { using std::string; + struct DispatchCompletion + { + virtual ~DispatchCompletion() {} + virtual void completed() = 0; + }; + /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to * registered consumers or be stored until dequeued or until one * or more consumers registers. */ - class Queue : public PersistableQueue{ + class Queue : public PersistableQueue { typedef std::vector<Consumer*> Consumers; typedef std::deque<QueuedMessage> Messages; - struct DispatchFunctor { + struct DispatchFunctor + { Queue& queue; - DispatchFunctor(Queue& q) : queue(q) {} - void operator()() { queue.dispatch(); } + DispatchCompletion* sync; + DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {} + void operator()() + { + queue.dispatch(); + if (sync) sync->completed(); + } }; const string name; @@ -93,6 +105,7 @@ namespace qpid { */ void dispatch(); void cancel(Consumer* c, Consumers& set); + void serviceAllBrowsers(); void serviceBrowser(Consumer* c); Consumer* allocate(); bool seek(QueuedMessage& msg, const framing::SequenceNumber& position); @@ -149,7 +162,8 @@ namespace qpid { * at any time, so this call schedules the despatch based on * the serilizer policy. */ - void requestDispatch(Consumer* c = 0, bool sync = false); + void requestDispatch(Consumer* c = 0); + void flush(DispatchCompletion& callback); void consume(Consumer* c, bool exclusive = false); void cancel(Consumer* c); uint32_t purge(); |
