summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp6
-rw-r--r--cpp/src/qpid/broker/Connection.h4
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp10
3 files changed, 14 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index e77911bd10..2525fed864 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -51,6 +51,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
ConnectionState(out_, broker_),
receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
closedFn(boost::bind(&Connection::closedImpl, this)),
+ doOutputFn(boost::bind(&Connection::doOutputImpl, this)),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
@@ -192,8 +193,9 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions.
}
}
-bool Connection::doOutput()
-{
+bool Connection::doOutput() { return doOutputFn(); }
+
+bool Connection::doOutputImpl() {
try{
if (ioCallback)
ioCallback(); // Lend the IO thread for management processing
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 0d646bab83..ae8708861a 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -96,7 +96,8 @@ class Connection : public sys::ConnectionInputHandler,
// Extension points: allow plugins to insert additional functionality.
boost::function<void(framing::AMQFrame&)> receivedFn;
- boost::function<void()> closedFn;
+ boost::function<void ()> closedFn;
+ boost::function<bool ()> doOutputFn;
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -104,6 +105,7 @@ class Connection : public sys::ConnectionInputHandler,
void receivedImpl(framing::AMQFrame& frame);
void closedImpl();
+ bool doOutputImpl();
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 2bc366dc86..085578295d 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -380,9 +380,13 @@ void SemanticState::requestDispatch()
void SemanticState::requestDispatch(ConsumerImpl& c)
{
- if(c.isBlocked()) {
- c.doOutput();
- }
+ if(c.isBlocked())
+ outputTasks.activateOutput();
+ // TODO aconway 2008-07-16: we could directly call
+ // c.doOutput();
+ // since we are in the connections thread but for consistency
+ // activateOutput() will set it up to be called in the next write idle.
+ // Current cluster code depends on this, review cluster code to change.
}
void SemanticState::complete(DeliveryRecord& delivery)