summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp11
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.h3
2 files changed, 12 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index fc95f46fb9..ff9c740926 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -51,6 +51,7 @@ bool AggregateOutput::doOutput() {
while (!tasks.empty()) {
OutputTask* t=tasks.front();
tasks.pop_front();
+ taskSet.erase(t);
bool didOutput;
{
// Allow concurrent call to addOutputTask.
@@ -59,7 +60,9 @@ bool AggregateOutput::doOutput() {
didOutput = t->doOutput();
}
if (didOutput) {
- tasks.push_back(t);
+ if (taskSet.insert(t).second) {
+ tasks.push_back(t);
+ }
return true;
}
}
@@ -68,12 +71,15 @@ bool AggregateOutput::doOutput() {
void AggregateOutput::addOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
- tasks.push_back(task);
+ if (taskSet.insert(task).second) {
+ tasks.push_back(task);
+ }
}
void AggregateOutput::removeOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.erase(task);
tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
}
@@ -81,6 +87,7 @@ void AggregateOutput::removeAll()
{
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.clear();
tasks.clear();
}
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h
index d7c0ff29e3..802722ad26 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.h
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h
@@ -28,6 +28,7 @@
#include <algorithm>
#include <deque>
+#include <set>
namespace qpid {
namespace sys {
@@ -44,9 +45,11 @@ namespace sys {
class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
{
typedef std::deque<OutputTask*> TaskList;
+ typedef std::set<OutputTask*> TaskSet;
Monitor lock;
TaskList tasks;
+ TaskSet taskSet;
bool busy;
OutputControl& control;