diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
| commit | b4dac41573e33e1a04a2b7b8c9a35f5e72b662bc (patch) | |
| tree | 15e25a2dc7b42f3788f9dccdd7e632dcf8c346d2 /cpp/src/qpid/broker/SessionState.cpp | |
| parent | 928699508993a5ccc59254027773281883d1e973 (diff) | |
| download | qpid-python-b4dac41573e33e1a04a2b7b8c9a35f5e72b662bc.tar.gz | |
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema.
The preview codepath (99-0) remains unaltered.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 49 |
1 files changed, 13 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 573a567da6..5f04136444 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -49,7 +49,7 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -170,9 +170,9 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, void SessionState::handleCommand(framing::AMQMethodBody* method) { - SequenceNumber id = incoming.next(); + SequenceNumber id = next++; Invoker::Result invocation = invoke(adapter, *method); - incoming.complete(id); + completed.add(id); if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); @@ -180,7 +180,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method) getProxy().getExecution().result(id.getValue(), invocation.getResult()); } if (method->isSync()) { - incoming.sync(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -190,7 +189,8 @@ void SessionState::handleContent(AMQFrame& frame) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags - msgBuilder.start(incoming.next()); + SequenceNumber id = next++; + msgBuilder.start(id); msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); @@ -198,9 +198,9 @@ void SessionState::handleContent(AMQFrame& frame) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - incoming.track(msg); + //TODO: may want to hold up execution until async enqueue is complete + completed.add(msg->getCommandId()); if (msg->getFrames().getMethod()->isSync()) { - incoming.sync(msg->getCommandId()); sendCompletion(); } } @@ -208,6 +208,8 @@ void SessionState::handleContent(AMQFrame& frame) void SessionState::handle(AMQFrame& frame) { + received(frame); + //TODO: make command handling more uniform, regardless of whether //commands carry content. (For now, assume all single frame //assmblies are non-content bearing and all content-bearing @@ -229,38 +231,13 @@ DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr t void SessionState::sendCompletion() { - SequenceNumber mark = incoming.getMark(); - SequenceNumberSet range = incoming.getRange(); - getProxy().getExecution().complete(mark.getValue(), range); -} - -void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - //record: - SequenceNumber mark(cumulative); - if (outgoing.lwm < mark) { - outgoing.lwm = mark; - //ack messages: - semanticState.ackCumulative(mark.getValue()); - } - range.processRanges(ackOp); -} - -void SessionState::flush() -{ - incoming.flush(); - sendCompletion(); -} - -void SessionState::sync() -{ - incoming.sync(); - sendCompletion(); + handler->sendCompletion(); } -void SessionState::noop() +void SessionState::complete(const SequenceSet& commands) { - incoming.noop(); + knownCompleted.add(commands); + commands.for_each(ackOp); } |
