summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/SessionState.cpp')
-rw-r--r--cpp/src/qpid/SessionState.cpp164
1 files changed, 86 insertions, 78 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 8905fb5f9d..2a6b8303b3 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -31,6 +31,7 @@ using framing::AMQFrame;
using amqp_0_10::NotImplementedException;
using amqp_0_10::InvalidArgumentException;
using amqp_0_10::IllegalStateException;
+using amqp_0_10::ResourceLimitExceededException;
namespace {
bool isControl(const AMQFrame& f) {
@@ -63,78 +64,83 @@ bool SessionPoint::operator==(const SessionPoint& x) const {
return command == x.command && offset == x.offset;
}
-SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {}
+SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
+SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; }
+SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; }
-const SessionPoint& SessionState::SendState::getCommandPoint() {
- return sendPoint;
+SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expect) {
+ if (expect < sender.replayPoint || sender.sendPoint < expect)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": expected command-point out of range."));
+ ReplayList::iterator i = sender.replayList.begin();
+ SessionPoint p = sender.replayPoint;
+ while (i != sender.replayList.end() && p.command < expect.command)
+ p.advance(*i++);
+ assert(p.command == expect.command);
+ return boost::make_iterator_range(i, sender.replayList.end());
}
-bool SessionState::SendState::expected(const SessionPoint& point) {
- if (point < replayPoint || sendPoint < point)
- throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range."));
- // FIXME aconway 2008-05-06: this is not strictly correct, we should keep
- // an intermediate replay pointer into the replay list.
- confirmed(point); // Drop commands prior to expected from replay.
- return (!replayList.empty());
-}
-
-void SessionState::SendState::record(const AMQFrame& f) {
+void SessionState::senderRecord(const AMQFrame& f) {
if (isControl(f)) return; // Ignore control frames.
- session->stateful = true;
- replayList.push_back(f);
- unflushedSize += f.size();
- incomplete += sendPoint.command;
- sendPoint.advance(f);
-}
-
-bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; }
-
-void SessionState::SendState::recordFlush() {
- assert(flushPoint <= sendPoint);
- flushPoint = sendPoint;
- unflushedSize = 0;
-}
-
-void SessionState::SendState::confirmed(const SessionPoint& confirmed) {
- if (confirmed > sendPoint)
- throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent."));
- ReplayList::iterator i = replayList.begin();
- while (i != replayList.end() && replayPoint.command < confirmed.command) {
- replayPoint.advance(*i);
- assert(replayPoint <= sendPoint);
- if (replayPoint > flushPoint)
- unflushedSize -= i->size();
+ stateful = true;
+ sender.replayList.push_back(f);
+ sender.unflushedSize += f.size();
+ sender.replaySize += f.size();
+ sender.incomplete += sender.sendPoint.command;
+ sender.sendPoint.advance(f);
+ if (config.replayHardLimit && config.replayHardLimit < sender.replaySize)
+ throw ResourceLimitExceededException("Replay bufffer exceeeded hard limit");
+}
+
+bool SessionState::senderNeedFlush() const {
+ return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit;
+}
+
+void SessionState::senderRecordFlush() {
+ assert(sender.flushPoint <= sender.sendPoint);
+ sender.flushPoint = sender.sendPoint;
+ sender.unflushedSize = 0;
+}
+
+void SessionState::senderConfirmed(const SessionPoint& confirmed) {
+ if (confirmed > sender.sendPoint)
+ throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent."));
+ ReplayList::iterator i = sender.replayList.begin();
+ while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
+ sender.replayPoint.advance(*i);
+ assert(sender.replayPoint <= sender.sendPoint);
+ sender.replaySize -= i->size();
+ if (sender.replayPoint > sender.flushPoint)
+ sender.unflushedSize -= i->size();
++i;
}
- if (replayPoint > flushPoint) flushPoint = replayPoint;
- replayList.erase(replayList.begin(), i);
- assert(replayPoint.offset == 0);
+ if (sender.replayPoint > sender.flushPoint)
+ sender.flushPoint = sender.replayPoint;
+ sender.replayList.erase(sender.replayList.begin(), i);
+ assert(sender.replayPoint.offset == 0);
}
-void SessionState::SendState::completed(const SequenceSet& commands) {
+void SessionState::senderCompleted(const SequenceSet& commands) {
if (commands.empty()) return;
- incomplete -= commands;
+ sender.incomplete -= commands;
// Completion implies confirmation but we don't handle out-of-order
// confirmation, so confirm only the first contiguous range of commands.
- confirmed(SessionPoint(commands.rangesBegin()->end()));
+ senderConfirmed(SessionPoint(commands.rangesBegin()->end()));
}
-SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {}
-
-void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) {
- if (session->hasState() && point > received)
- throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range."));
- expected = point;
- if (expected > received)
- received = expected;
+void SessionState::receiverSetCommandPoint(const SessionPoint& point) {
+ if (hasState() && point > receiver.received)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": Command-point out of range."));
+ receiver.expected = point;
+ if (receiver.expected > receiver.received)
+ receiver.received = receiver.expected;
}
-bool SessionState::ReceiveState::record(const AMQFrame& f) {
+bool SessionState::receiverRecord(const AMQFrame& f) {
if (isControl(f)) return true; // Ignore control frames.
- session->stateful = true;
- expected.advance(f);
- if (expected > received) {
- received = expected;
+ stateful = true;
+ receiver.expected.advance(f);
+ if (receiver.expected > receiver.received) {
+ receiver.received = receiver.expected;
return true;
}
else {
@@ -143,41 +149,43 @@ bool SessionState::ReceiveState::record(const AMQFrame& f) {
}
}
-void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) {
- assert(command <= received.command); // Internal error to complete an unreceived command.
- assert(firstIncomplete <= command);
+void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
+ assert(command <= receiver.received.command); // Internal error to complete an unreceived command.
+ assert(receiver.firstIncomplete <= command);
if (cumulative)
- unknownCompleted.add(firstIncomplete, command);
+ receiver.unknownCompleted.add(receiver.firstIncomplete, command);
else
- unknownCompleted += command;
- firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end();
+ receiver.unknownCompleted += command;
+ receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end();
+ QPID_LOG(debug, "Completed " << command << " unknown=" << receiver.unknownCompleted);
}
-void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) {
- if (!commands.empty() && commands.back() > received.command)
- throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands."));
- unknownCompleted -= commands;
+void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
+ if (!commands.empty() && commands.back() > receiver.received.command)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands."));
+ receiver.unknownCompleted -= commands;
}
-SequenceNumber SessionState::ReceiveState::getCurrent() const {
- SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic.
- return --current;
+const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; }
+const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; }
+const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; }
+
+SequenceNumber SessionState::receiverGetCurrent() const {
+ SequenceNumber current = receiver.expected.command;
+ if (receiver.expected.offset == 0)
+ --current;
+ return current;
}
-// FIXME aconway 2008-05-02: implement sync & kill limits.
-SessionState::Configuration::Configuration()
- : replaySyncSize(std::numeric_limits<size_t>::max()),
- replayKillSize(std::numeric_limits<size_t>::max()) {}
+SessionState::Configuration::Configuration(size_t flush, size_t hard) :
+ replayFlushLimit(flush), replayHardLimit(hard) {}
-SessionState::SessionState(const SessionId& i, const Configuration& c)
- : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful()
+SessionState::SessionState(const SessionId& i, const Configuration& c) : id(i), timeout(), config(c), stateful()
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
-bool SessionState::hasState() const {
- return stateful;
-}
+bool SessionState::hasState() const { return stateful; }
SessionState::~SessionState() {}