From f9380575ddccbe48edd5305e96db70892c1dc1aa Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 27 Mar 2008 18:04:42 +0000 Subject: Send accept in response to message publications if required. Hold up completion (and accept) until message from transfer is fully enqueued. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641929 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/SessionState.cpp | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) (limited to 'cpp/src/qpid/broker/SessionState.cpp') diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 35ad562a22..19fb0a4a79 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -50,7 +50,8 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)), + enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - //TODO: may want to hold up execution until async enqueue is complete - completed.add(msg->getCommandId()); + + if (msg->isEnqueueComplete()) { + enqueued(msg); + } else { + incomplete.add(msg); + } + + //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); + } else { + incomplete.process(enqueuedOp, false); } } } +void SessionState::enqueued(boost::intrusive_ptr msg) +{ + completed.add(msg->getCommandId()); + if (msg->requiresAccept()) { + getProxy().getMessage010().accept(SequenceSet(msg->getCommandId())); + } +} + void SessionState::handle(AMQFrame& frame) { received(frame); -- cgit v1.2.1