diff options
author | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
commit | 0333573627c831142aa251bfb1cabdb1e2bf438e (patch) | |
tree | 953bf8c624374c57953aa3f2888254d175609d9a /cpp/src/qpid/SessionState.cpp | |
parent | 96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff) | |
download | qpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz |
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 164 |
1 files changed, 86 insertions, 78 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 8905fb5f9d..2a6b8303b3 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -31,6 +31,7 @@ using framing::AMQFrame; using amqp_0_10::NotImplementedException; using amqp_0_10::InvalidArgumentException; using amqp_0_10::IllegalStateException; +using amqp_0_10::ResourceLimitExceededException; namespace { bool isControl(const AMQFrame& f) { @@ -63,78 +64,83 @@ bool SessionPoint::operator==(const SessionPoint& x) const { return command == x.command && offset == x.offset; } -SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {} +SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; } +SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; } +SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; } -const SessionPoint& SessionState::SendState::getCommandPoint() { - return sendPoint; +SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expect) { + if (expect < sender.replayPoint || sender.sendPoint < expect) + throw InvalidArgumentException(QPID_MSG(getId() << ": expected command-point out of range.")); + ReplayList::iterator i = sender.replayList.begin(); + SessionPoint p = sender.replayPoint; + while (i != sender.replayList.end() && p.command < expect.command) + p.advance(*i++); + assert(p.command == expect.command); + return boost::make_iterator_range(i, sender.replayList.end()); } -bool SessionState::SendState::expected(const SessionPoint& point) { - if (point < replayPoint || sendPoint < point) - throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range.")); - // FIXME aconway 2008-05-06: this is not strictly correct, we should keep - // an intermediate replay pointer into the replay list. - confirmed(point); // Drop commands prior to expected from replay. - return (!replayList.empty()); -} - -void SessionState::SendState::record(const AMQFrame& f) { +void SessionState::senderRecord(const AMQFrame& f) { if (isControl(f)) return; // Ignore control frames. - session->stateful = true; - replayList.push_back(f); - unflushedSize += f.size(); - incomplete += sendPoint.command; - sendPoint.advance(f); -} - -bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; } - -void SessionState::SendState::recordFlush() { - assert(flushPoint <= sendPoint); - flushPoint = sendPoint; - unflushedSize = 0; -} - -void SessionState::SendState::confirmed(const SessionPoint& confirmed) { - if (confirmed > sendPoint) - throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent.")); - ReplayList::iterator i = replayList.begin(); - while (i != replayList.end() && replayPoint.command < confirmed.command) { - replayPoint.advance(*i); - assert(replayPoint <= sendPoint); - if (replayPoint > flushPoint) - unflushedSize -= i->size(); + stateful = true; + sender.replayList.push_back(f); + sender.unflushedSize += f.size(); + sender.replaySize += f.size(); + sender.incomplete += sender.sendPoint.command; + sender.sendPoint.advance(f); + if (config.replayHardLimit && config.replayHardLimit < sender.replaySize) + throw ResourceLimitExceededException("Replay bufffer exceeeded hard limit"); +} + +bool SessionState::senderNeedFlush() const { + return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit; +} + +void SessionState::senderRecordFlush() { + assert(sender.flushPoint <= sender.sendPoint); + sender.flushPoint = sender.sendPoint; + sender.unflushedSize = 0; +} + +void SessionState::senderConfirmed(const SessionPoint& confirmed) { + if (confirmed > sender.sendPoint) + throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent.")); + ReplayList::iterator i = sender.replayList.begin(); + while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) { + sender.replayPoint.advance(*i); + assert(sender.replayPoint <= sender.sendPoint); + sender.replaySize -= i->size(); + if (sender.replayPoint > sender.flushPoint) + sender.unflushedSize -= i->size(); ++i; } - if (replayPoint > flushPoint) flushPoint = replayPoint; - replayList.erase(replayList.begin(), i); - assert(replayPoint.offset == 0); + if (sender.replayPoint > sender.flushPoint) + sender.flushPoint = sender.replayPoint; + sender.replayList.erase(sender.replayList.begin(), i); + assert(sender.replayPoint.offset == 0); } -void SessionState::SendState::completed(const SequenceSet& commands) { +void SessionState::senderCompleted(const SequenceSet& commands) { if (commands.empty()) return; - incomplete -= commands; + sender.incomplete -= commands; // Completion implies confirmation but we don't handle out-of-order // confirmation, so confirm only the first contiguous range of commands. - confirmed(SessionPoint(commands.rangesBegin()->end())); + senderConfirmed(SessionPoint(commands.rangesBegin()->end())); } -SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {} - -void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) { - if (session->hasState() && point > received) - throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range.")); - expected = point; - if (expected > received) - received = expected; +void SessionState::receiverSetCommandPoint(const SessionPoint& point) { + if (hasState() && point > receiver.received) + throw InvalidArgumentException(QPID_MSG(getId() << ": Command-point out of range.")); + receiver.expected = point; + if (receiver.expected > receiver.received) + receiver.received = receiver.expected; } -bool SessionState::ReceiveState::record(const AMQFrame& f) { +bool SessionState::receiverRecord(const AMQFrame& f) { if (isControl(f)) return true; // Ignore control frames. - session->stateful = true; - expected.advance(f); - if (expected > received) { - received = expected; + stateful = true; + receiver.expected.advance(f); + if (receiver.expected > receiver.received) { + receiver.received = receiver.expected; return true; } else { @@ -143,41 +149,43 @@ bool SessionState::ReceiveState::record(const AMQFrame& f) { } } -void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) { - assert(command <= received.command); // Internal error to complete an unreceived command. - assert(firstIncomplete <= command); +void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { + assert(command <= receiver.received.command); // Internal error to complete an unreceived command. + assert(receiver.firstIncomplete <= command); if (cumulative) - unknownCompleted.add(firstIncomplete, command); + receiver.unknownCompleted.add(receiver.firstIncomplete, command); else - unknownCompleted += command; - firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end(); + receiver.unknownCompleted += command; + receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end(); + QPID_LOG(debug, "Completed " << command << " unknown=" << receiver.unknownCompleted); } -void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) { - if (!commands.empty() && commands.back() > received.command) - throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands.")); - unknownCompleted -= commands; +void SessionState::receiverKnownCompleted(const SequenceSet& commands) { + if (!commands.empty() && commands.back() > receiver.received.command) + throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands.")); + receiver.unknownCompleted -= commands; } -SequenceNumber SessionState::ReceiveState::getCurrent() const { - SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic. - return --current; +const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; } +const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; } +const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; } + +SequenceNumber SessionState::receiverGetCurrent() const { + SequenceNumber current = receiver.expected.command; + if (receiver.expected.offset == 0) + --current; + return current; } -// FIXME aconway 2008-05-02: implement sync & kill limits. -SessionState::Configuration::Configuration() - : replaySyncSize(std::numeric_limits<size_t>::max()), - replayKillSize(std::numeric_limits<size_t>::max()) {} +SessionState::Configuration::Configuration(size_t flush, size_t hard) : + replayFlushLimit(flush), replayHardLimit(hard) {} -SessionState::SessionState(const SessionId& i, const Configuration& c) - : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful() +SessionState::SessionState(const SessionId& i, const Configuration& c) : id(i), timeout(), config(c), stateful() { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } -bool SessionState::hasState() const { - return stateful; -} +bool SessionState::hasState() const { return stateful; } SessionState::~SessionState() {} |