From 248f1fe188fe2307b9dcf2c87a83b653eaa1920c Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Sat, 26 Dec 2009 12:42:57 +0000 Subject: synchronized with trunk except for ruby dir git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/SessionState.cpp | 66 ++++++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 17 deletions(-) (limited to 'cpp/src/qpid/SessionState.cpp') diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 1be0111489..4f370c6765 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -19,9 +19,10 @@ * */ -#include "SessionState.h" +#include "qpid/SessionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/enum.h" #include "qpid/log/Statement.h" #include #include @@ -37,10 +38,10 @@ using framing::FramingErrorException; namespace { bool isControl(const AMQFrame& f) { - return f.getMethod() && f.getMethod()->type() == framing::CONTROL; + return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL; } bool isCommand(const AMQFrame& f) { - return f.getMethod() && f.getMethod()->type() == framing::COMMAND; + return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND; } } // namespace @@ -60,7 +61,7 @@ void SessionPoint::advance(const AMQFrame& f) { if (f.isLastSegment() && f.isLastFrame()) ++command; // Single-frame command. else - offset += f.size(); + offset += f.encodedSize(); } else { // continuation frame for partial command if (offset == 0) @@ -76,7 +77,7 @@ void SessionPoint::advance(const AMQFrame& f) { // that the relationship of fragment offsets to the replay // list can be computed more easily. // - offset += f.size(); + offset += f.encodedSize(); } } } @@ -112,12 +113,13 @@ SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expec void SessionState::senderRecord(const AMQFrame& f) { if (isControl(f)) return; // Ignore control frames. - QPID_LOG_IF(debug, f.getMethod(), getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getMethod()); + QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getBody()); + stateful = true; if (timeout) sender.replayList.push_back(f); - sender.unflushedSize += f.size(); - sender.bytesSinceKnownCompleted += f.size(); - sender.replaySize += f.size(); + sender.unflushedSize += f.encodedSize(); + sender.bytesSinceKnownCompleted += f.encodedSize(); + sender.replaySize += f.encodedSize(); sender.incomplete += sender.sendPoint.command; sender.sendPoint.advance(f); if (config.replayHardLimit && config.replayHardLimit < sender.replaySize) @@ -146,15 +148,15 @@ void SessionState::senderRecordKnownCompleted() { void SessionState::senderConfirmed(const SessionPoint& confirmed) { if (confirmed > sender.sendPoint) - throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent.")); + throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed < " << confirmed << " but only sent < " << sender.sendPoint)); QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed); ReplayList::iterator i = sender.replayList.begin(); while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) { sender.replayPoint.advance(*i); assert(sender.replayPoint <= sender.sendPoint); - sender.replaySize -= i->size(); + sender.replaySize -= i->encodedSize(); if (sender.replayPoint > sender.flushPoint) - sender.unflushedSize -= i->size(); + sender.unflushedSize -= i->encodedSize(); ++i; } if (sender.replayPoint > sender.flushPoint) @@ -168,7 +170,7 @@ void SessionState::senderCompleted(const SequenceSet& commands) { QPID_LOG(debug, getId() << ": sender marked completed: " << commands); sender.incomplete -= commands; // Completion implies confirmation but we don't handle out-of-order - // confirmation, so confirm only the first contiguous range of commands. + // confirmation, so confirm up to the end of the first contiguous range of commands. senderConfirmed(SessionPoint(commands.rangesBegin()->end())); } @@ -182,21 +184,23 @@ void SessionState::receiverSetCommandPoint(const SessionPoint& point) { } bool SessionState::receiverRecord(const AMQFrame& f) { + if (receiverTrackingDisabled) return true; //Very nasty hack for push bridges if (isControl(f)) return true; // Ignore control frames. stateful = true; receiver.expected.advance(f); - receiver.bytesSinceKnownCompleted += f.size(); + receiver.bytesSinceKnownCompleted += f.encodedSize(); bool firstTime = receiver.expected > receiver.received; if (firstTime) { receiver.received = receiver.expected; receiver.incomplete += receiverGetCurrent(); } - QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod()); - QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f); + QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getBody()); + if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame."); return firstTime; } void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { + if (receiverTrackingDisabled) return; //Very nasty hack for push bridges assert(receiver.incomplete.contains(command)); // Internal error to complete command twice. SequenceNumber first =cumulative ? receiver.incomplete.front() : command; SequenceNumber last = command; @@ -236,7 +240,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) : replayFlushLimit(flush), replayHardLimit(hard) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : id(i), timeout(), config(c), stateful() + : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false) { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } @@ -249,4 +253,32 @@ std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { return o << "(" << p.command.getValue() << "+" << p.offset << ")"; } +void SessionState::setState( + const SequenceNumber& replayStart, + const SequenceNumber& sendCommandPoint, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, + const SequenceSet& receivedIncomplete +) +{ + sender.replayPoint = replayStart; + sender.flushPoint = sendCommandPoint; + sender.sendPoint = sendCommandPoint; + sender.unflushedSize = 0; + sender.replaySize = 0; // Replay list will be updated separately. + sender.incomplete = sentIncomplete; + sender.bytesSinceKnownCompleted = 0; + + receiver.expected = expected; + receiver.received = received; + receiver.unknownCompleted = unknownCompleted; + receiver.incomplete = receivedIncomplete; + receiver.bytesSinceKnownCompleted = 0; +} + +void SessionState::disableReceiverTracking() { receiverTrackingDisabled = true; } +void SessionState::enableReceiverTracking() { receiverTrackingDisabled = false; } + } // namespace qpid -- cgit v1.2.1