summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing/SessionState.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-01 00:38:58 +0000
committerAlan Conway <aconway@apache.org>2007-11-01 00:38:58 +0000
commitd4838b1db929de6d650b7cdf574c04425c01b38d (patch)
treed519ff7d639f6f1bca111bc12930abf1da405e67 /cpp/src/qpid/framing/SessionState.cpp
parentaf6457122a32f1f5a0224fc54f3d0c24377510e3 (diff)
downloadqpid-python-d4838b1db929de6d650b7cdf574c04425c01b38d.tar.gz
Preparation for session thread safety overhaul:
- simplified SessionState, responsibility for protocol states now in Handlers - qpid::RefCounted, qpid::intrusive_ptr reference counting support. - build boost unit tests as single exe, speeds up testing. - fixed leak in AsynchIOAcceptor.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@590869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing/SessionState.cpp')
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp21
1 files changed, 21 insertions, 0 deletions
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
index 7e905bdf63..8056f4a523 100644
--- a/cpp/src/qpid/framing/SessionState.cpp
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -33,6 +33,7 @@ namespace qpid {
namespace framing {
SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
+ state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -44,6 +45,7 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
{}
SessionState::SessionState(const Uuid& uuid) :
+ state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -63,6 +65,10 @@ bool isSessionCommand(const AMQFrame& f) {
boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
if (isSessionCommand(f))
return boost::none;
+ if (state==RESUMING)
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
+ assert(state = ATTACHED);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
if (ackInterval && lastReceived == sendAckAt)
@@ -79,6 +85,7 @@ bool SessionState::sent(const AMQFrame& f) {
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
return ackInterval &&
+ (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -90,6 +97,8 @@ SessionState::Replay SessionState::replay() {
}
void SessionState::receivedAck(SequenceNumber acked) {
+ if (state==RESUMING) state=ATTACHED;
+ assert(state==ATTACHED);
if (lastSent < acked)
throw InvalidArgumentException("Invalid sequence number in ack");
size_t keep = lastSent - acked;
@@ -104,10 +113,22 @@ SequenceNumber SessionState::sendingAck() {
}
bool SessionState::sendingSolicit() {
+ assert(state == ATTACHED);
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
return ackInterval != 0;
}
+SequenceNumber SessionState::resuming() {
+ if (!resumable)
+ throw InternalErrorException("Session is not resumable");
+ state = RESUMING;
+ return sendingAck();
+}
+
+void SessionState::suspend() {
+ state = SUSPENDED;
+}
+
}} // namespace qpid::framing