summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/SessionState.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-23 13:39:07 +0000
committerAlan Conway <aconway@apache.org>2008-05-23 13:39:07 +0000
commit896d94f7c27e958806b96b537a7a96208ede145a (patch)
tree35c139fcc037fde8fef675d9d730882d22781931 /cpp/src/qpid/SessionState.cpp
parentbb4584d228b837fa70839560d72bc2a59dc1aa17 (diff)
downloadqpid-python-896d94f7c27e958806b96b537a7a96208ede145a.tar.gz
qpid::SessionState: Added error checking for invalid frame sequences.
client: Fix client crash on error during connection shutdown. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/SessionState.cpp')
-rw-r--r--cpp/src/qpid/SessionState.cpp66
1 files changed, 45 insertions, 21 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 96f2cca726..28e433b911 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -20,7 +20,7 @@
*/
#include "SessionState.h"
-#include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -28,32 +28,56 @@
namespace qpid {
using framing::AMQFrame;
-using amqp_0_10::NotImplementedException;
-using amqp_0_10::InvalidArgumentException;
-using amqp_0_10::IllegalStateException;
-using amqp_0_10::ResourceLimitExceededException;
-using amqp_0_10::InternalErrorException;
+using framing::NotImplementedException;
+using framing::InvalidArgumentException;
+using framing::IllegalStateException;
+using framing::ResourceLimitExceededException;
+using framing::InternalErrorException;
+using framing::FramingErrorException;
namespace {
bool isControl(const AMQFrame& f) {
- return f.getMethod() && f.getMethod()->type() == 0;
+ return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
}
} // namespace
-/** A point in the session - command id + offset */
+SessionPoint::SessionPoint(SequenceNumber c, uint64_t o) : command(c), offset(o) {}
+
+// TODO aconway 2008-05-22: Do complete frame sequence validity check here,
+// currently duplicated betwen broker and client session impl.
+//
void SessionPoint::advance(const AMQFrame& f) {
- if (f.isLastSegment() && f.isLastFrame()) {
- ++command;
- offset = 0;
+ if (isControl(f)) return; // Ignore controls.
+ if (f.isFirstSegment() && f.isFirstFrame()) {
+ if (offset != 0)
+ throw FramingErrorException(QPID_MSG("Unexpected command start frame."));
+ if (!isCommand(f))
+ throw FramingErrorException(
+ QPID_MSG("Command start frame has invalid type" << f.getBody()->type()));
+ if (f.isLastSegment() && f.isLastFrame())
+ ++command; // Single-frame command.
+ else
+ offset += f.size();
}
- else {
- // TODO aconway 2008-04-24: if we go to support for partial
- // command replay, then it may be better to record the unframed
- // data size in a command point rather than the framed size so
- // that the relationship of fragment offsets to the replay
- // list can be computed more easily.
- //
- offset += f.size();
+ else { // continuation frame for partial command
+ if (offset == 0)
+ throw FramingErrorException(QPID_MSG("Unexpected command continuation frame."));
+ if (f.isLastSegment() && f.isLastFrame()) {
+ ++command;
+ offset = 0;
+ }
+ else {
+ // TODO aconway 2008-04-24: if we go to support for partial
+ // command replay, then it may be better to record the unframed
+ // data size in a command point rather than the framed size so
+ // that the relationship of fragment offsets to the replay
+ // list can be computed more easily.
+ //
+ offset += f.size();
+ }
}
}
@@ -65,6 +89,7 @@ bool SessionPoint::operator==(const SessionPoint& x) const {
return command == x.command && offset == x.offset;
}
+
SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; }
SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; }
@@ -156,8 +181,7 @@ bool SessionState::receiverRecord(const AMQFrame& f) {
}
void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
- if (!receiver.incomplete.contains(command))
- throw InternalErrorException(QPID_MSG(getId() << "command is not received-incomplete: " << command ));
+ assert(receiver.incomplete.contains(command)); // Internal error to complete command twice.
SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
SequenceNumber last = command;
receiver.unknownCompleted.add(first, last);