From b33a63b36c659a894143382d0a61efe6a598fcc6 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 6 Sep 2007 20:27:33 +0000 Subject: Implementation of execution.result on the client side git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/CompletionTracker.cpp | 74 +++++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 9 deletions(-) (limited to 'cpp/src/qpid/client/CompletionTracker.cpp') diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp index 996971dbd2..46a7384ac2 100644 --- a/cpp/src/qpid/client/CompletionTracker.cpp +++ b/cpp/src/qpid/client/CompletionTracker.cpp @@ -20,45 +20,101 @@ */ #include "CompletionTracker.h" +#include using qpid::client::CompletionTracker; using namespace qpid::framing; using namespace boost; +namespace +{ +const std::string empty; +} + CompletionTracker::CompletionTracker() {} CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} +void CompletionTracker::close() +{ + sys::Mutex::ScopedLock l(lock); + while (!listeners.empty()) { + Record r(listeners.front()); + { + sys::Mutex::ScopedUnlock u(lock); + r.completed(); + } + listeners.pop_front(); + } +} void CompletionTracker::completed(const SequenceNumber& _mark) { sys::Mutex::ScopedLock l(lock); mark = _mark; - while (!listeners.empty() && !(listeners.front().first > mark)) { - Listener f(listeners.front().second); + while (!listeners.empty() && !(listeners.front().id > mark)) { + Record r(listeners.front()); { sys::Mutex::ScopedUnlock u(lock); - f(); + r.completed(); } - listeners.pop(); + listeners.pop_front(); + } +} + +void CompletionTracker::received(const SequenceNumber& id, const std::string& result) +{ + sys::Mutex::ScopedLock l(lock); + Listeners::iterator i = seek(id); + if (i != listeners.end() && i->id == id) { + i->received(result); + listeners.erase(i); } } -void CompletionTracker::listen(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener) { - if (!add(point, listener)) { + if (!add(Record(point, listener))) { listener(); } } -bool CompletionTracker::add(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener) +{ + if (!add(Record(point, listener))) { + listener(empty); + } +} + +bool CompletionTracker::add(const Record& record) { sys::Mutex::ScopedLock l(lock); - if (point < mark) { + if (record.id < mark) { return false; } else { - listeners.push(make_pair(point, listener)); + //insert at the correct position + Listeners::iterator i = seek(record.id); + if (i == listeners.end()) i = listeners.begin(); + listeners.insert(i, record); + return true; } } +CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point) +{ + Listeners::iterator i = listeners.begin(); + while (i != listeners.end() && i->id < point) i++; + return i; +} + +void CompletionTracker::Record::completed() +{ + if (f) f(); + else if(g) g(empty);//won't get a result if command is now complete +} + +void CompletionTracker::Record::received(const std::string& result) +{ + if (g) g(result); +} -- cgit v1.2.1