summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/SessionState.cpp
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
committerRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
commit248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch)
treed5d0959a70218946ff72e107a6c106e32479a398 /cpp/src/qpid/SessionState.cpp
parent3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff)
downloadqpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/SessionState.cpp')
-rw-r--r--cpp/src/qpid/SessionState.cpp66
1 files changed, 49 insertions, 17 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 1be0111489..4f370c6765 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -19,9 +19,10 @@
*
*/
-#include "SessionState.h"
+#include "qpid/SessionState.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <numeric>
@@ -37,10 +38,10 @@ using framing::FramingErrorException;
namespace {
bool isControl(const AMQFrame& f) {
- return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+ return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL;
}
bool isCommand(const AMQFrame& f) {
- return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
+ return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND;
}
} // namespace
@@ -60,7 +61,7 @@ void SessionPoint::advance(const AMQFrame& f) {
if (f.isLastSegment() && f.isLastFrame())
++command; // Single-frame command.
else
- offset += f.size();
+ offset += f.encodedSize();
}
else { // continuation frame for partial command
if (offset == 0)
@@ -76,7 +77,7 @@ void SessionPoint::advance(const AMQFrame& f) {
// that the relationship of fragment offsets to the replay
// list can be computed more easily.
//
- offset += f.size();
+ offset += f.encodedSize();
}
}
}
@@ -112,12 +113,13 @@ SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expec
void SessionState::senderRecord(const AMQFrame& f) {
if (isControl(f)) return; // Ignore control frames.
- QPID_LOG_IF(debug, f.getMethod(), getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getMethod());
+ QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getBody());
+
stateful = true;
if (timeout) sender.replayList.push_back(f);
- sender.unflushedSize += f.size();
- sender.bytesSinceKnownCompleted += f.size();
- sender.replaySize += f.size();
+ sender.unflushedSize += f.encodedSize();
+ sender.bytesSinceKnownCompleted += f.encodedSize();
+ sender.replaySize += f.encodedSize();
sender.incomplete += sender.sendPoint.command;
sender.sendPoint.advance(f);
if (config.replayHardLimit && config.replayHardLimit < sender.replaySize)
@@ -146,15 +148,15 @@ void SessionState::senderRecordKnownCompleted() {
void SessionState::senderConfirmed(const SessionPoint& confirmed) {
if (confirmed > sender.sendPoint)
- throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent."));
+ throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed < " << confirmed << " but only sent < " << sender.sendPoint));
QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed);
ReplayList::iterator i = sender.replayList.begin();
while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
sender.replayPoint.advance(*i);
assert(sender.replayPoint <= sender.sendPoint);
- sender.replaySize -= i->size();
+ sender.replaySize -= i->encodedSize();
if (sender.replayPoint > sender.flushPoint)
- sender.unflushedSize -= i->size();
+ sender.unflushedSize -= i->encodedSize();
++i;
}
if (sender.replayPoint > sender.flushPoint)
@@ -168,7 +170,7 @@ void SessionState::senderCompleted(const SequenceSet& commands) {
QPID_LOG(debug, getId() << ": sender marked completed: " << commands);
sender.incomplete -= commands;
// Completion implies confirmation but we don't handle out-of-order
- // confirmation, so confirm only the first contiguous range of commands.
+ // confirmation, so confirm up to the end of the first contiguous range of commands.
senderConfirmed(SessionPoint(commands.rangesBegin()->end()));
}
@@ -182,21 +184,23 @@ void SessionState::receiverSetCommandPoint(const SessionPoint& point) {
}
bool SessionState::receiverRecord(const AMQFrame& f) {
+ if (receiverTrackingDisabled) return true; //Very nasty hack for push bridges
if (isControl(f)) return true; // Ignore control frames.
stateful = true;
receiver.expected.advance(f);
- receiver.bytesSinceKnownCompleted += f.size();
+ receiver.bytesSinceKnownCompleted += f.encodedSize();
bool firstTime = receiver.expected > receiver.received;
if (firstTime) {
receiver.received = receiver.expected;
receiver.incomplete += receiverGetCurrent();
}
- QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod());
- QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f);
+ QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getBody());
+ if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame.");
return firstTime;
}
void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
+ if (receiverTrackingDisabled) return; //Very nasty hack for push bridges
assert(receiver.incomplete.contains(command)); // Internal error to complete command twice.
SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
SequenceNumber last = command;
@@ -236,7 +240,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) :
replayFlushLimit(flush), replayHardLimit(hard) {}
SessionState::SessionState(const SessionId& i, const Configuration& c)
- : id(i), timeout(), config(c), stateful()
+ : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false)
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
@@ -249,4 +253,32 @@ std::ostream& operator<<(std::ostream& o, const SessionPoint& p) {
return o << "(" << p.command.getValue() << "+" << p.offset << ")";
}
+void SessionState::setState(
+ const SequenceNumber& replayStart,
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete
+)
+{
+ sender.replayPoint = replayStart;
+ sender.flushPoint = sendCommandPoint;
+ sender.sendPoint = sendCommandPoint;
+ sender.unflushedSize = 0;
+ sender.replaySize = 0; // Replay list will be updated separately.
+ sender.incomplete = sentIncomplete;
+ sender.bytesSinceKnownCompleted = 0;
+
+ receiver.expected = expected;
+ receiver.received = received;
+ receiver.unknownCompleted = unknownCompleted;
+ receiver.incomplete = receivedIncomplete;
+ receiver.bytesSinceKnownCompleted = 0;
+}
+
+void SessionState::disableReceiverTracking() { receiverTrackingDisabled = true; }
+void SessionState::enableReceiverTracking() { receiverTrackingDisabled = false; }
+
} // namespace qpid