summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r--cpp/src/qpid/broker/Queue.h24
1 files changed, 19 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index e02444642b..6e859e67bb 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -48,20 +48,32 @@ namespace qpid {
using std::string;
+ struct DispatchCompletion
+ {
+ virtual ~DispatchCompletion() {}
+ virtual void completed() = 0;
+ };
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
* registered consumers or be stored until dequeued or until one
* or more consumers registers.
*/
- class Queue : public PersistableQueue{
+ class Queue : public PersistableQueue {
typedef std::vector<Consumer*> Consumers;
typedef std::deque<QueuedMessage> Messages;
- struct DispatchFunctor {
+ struct DispatchFunctor
+ {
Queue& queue;
- DispatchFunctor(Queue& q) : queue(q) {}
- void operator()() { queue.dispatch(); }
+ DispatchCompletion* sync;
+ DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {}
+ void operator()()
+ {
+ queue.dispatch();
+ if (sync) sync->completed();
+ }
};
const string name;
@@ -93,6 +105,7 @@ namespace qpid {
*/
void dispatch();
void cancel(Consumer* c, Consumers& set);
+ void serviceAllBrowsers();
void serviceBrowser(Consumer* c);
Consumer* allocate();
bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
@@ -149,7 +162,8 @@ namespace qpid {
* at any time, so this call schedules the despatch based on
* the serilizer policy.
*/
- void requestDispatch(Consumer* c = 0, bool sync = false);
+ void requestDispatch(Consumer* c = 0);
+ void flush(DispatchCompletion& callback);
void consume(Consumer* c, bool exclusive = false);
void cancel(Consumer* c);
uint32_t purge();