diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
| commit | 248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch) | |
| tree | d5d0959a70218946ff72e107a6c106e32479a398 /cpp/src/qpid/sys/AggregateOutput.cpp | |
| parent | 3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff) | |
| download | qpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz | |
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AggregateOutput.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 83 |
1 files changed, 53 insertions, 30 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index 2fad28c381..709d3bc640 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,44 +25,67 @@ namespace qpid { namespace sys { - -void AggregateOutput::activateOutput() -{ - control.activateOutput(); -} + +AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {} + +void AggregateOutput::abort() { control.abort(); } + +void AggregateOutput::activateOutput() { control.activateOutput(); } + +void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } bool AggregateOutput::hasOutput() { - for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) - if ((*i)->hasOutput()) return true; - return false; + Mutex::ScopedLock l(lock); + return !tasks.empty(); } -bool AggregateOutput::doOutput() -{ - bool result = false; - if (!tasks.empty()) { - if (next >= tasks.size()) next = next % tasks.size(); - - size_t start = next; - //loop until a task generated some output - while (!result) { - result = tasks[next++]->doOutput(); - if (next >= tasks.size()) next = next % tasks.size(); - if (start == next) break; +// Clear the busy flag and notify waiting threads in destructor. +struct ScopedBusy { + bool& flag; + Monitor& monitor; + ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } + ~ScopedBusy() { flag = false; monitor.notifyAll(); } +}; + +bool AggregateOutput::doOutput() { + Mutex::ScopedLock l(lock); + ScopedBusy sb(busy, lock); + + while (!tasks.empty()) { + OutputTask* t=tasks.front(); + tasks.pop_front(); + bool didOutput; + { + // Allow concurrent call to addOutputTask. + // removeOutputTask will wait till !busy before removing a task. + Mutex::ScopedUnlock u(lock); + didOutput = t->doOutput(); + } + if (didOutput) { + tasks.push_back(t); + return true; } } - return result; + return false; +} + +void AggregateOutput::addOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + tasks.push_back(task); } -void AggregateOutput::addOutputTask(OutputTask* t) -{ - tasks.push_back(t); +void AggregateOutput::removeOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } - -void AggregateOutput::removeOutputTask(OutputTask* t) + +void AggregateOutput::removeAll() { - TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t); - if (i != tasks.end()) tasks.erase(i); + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.clear(); } + }} // namespace qpid::sys |
