diff options
Diffstat (limited to 'cpp/src/qpid/framing')
| -rw-r--r-- | cpp/src/qpid/framing/SessionState.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/SessionState.h | 25 |
2 files changed, 40 insertions, 6 deletions
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp index 7e905bdf63..8056f4a523 100644 --- a/cpp/src/qpid/framing/SessionState.cpp +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -33,6 +33,7 @@ namespace qpid { namespace framing { SessionState::SessionState(uint32_t ack, const Uuid& uuid) : + state(ATTACHED), id(uuid), lastReceived(-1), lastSent(-1), @@ -44,6 +45,7 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) : {} SessionState::SessionState(const Uuid& uuid) : + state(ATTACHED), id(uuid), lastReceived(-1), lastSent(-1), @@ -63,6 +65,10 @@ bool isSessionCommand(const AMQFrame& f) { boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { if (isSessionCommand(f)) return boost::none; + if (state==RESUMING) + throw CommandInvalidException( + QPID_MSG("Invalid frame: Resuming session, expected session-ack")); + assert(state = ATTACHED); ++lastReceived; QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); if (ackInterval && lastReceived == sendAckAt) @@ -79,6 +85,7 @@ bool SessionState::sent(const AMQFrame& f) { ++lastSent; QPID_LOG(trace, "Sent # "<< lastSent << " " << id); return ackInterval && + (state!=RESUMING) && (lastSent == solicitAckAt) && sendingSolicit(); } @@ -90,6 +97,8 @@ SessionState::Replay SessionState::replay() { } void SessionState::receivedAck(SequenceNumber acked) { + if (state==RESUMING) state=ATTACHED; + assert(state==ATTACHED); if (lastSent < acked) throw InvalidArgumentException("Invalid sequence number in ack"); size_t keep = lastSent - acked; @@ -104,10 +113,22 @@ SequenceNumber SessionState::sendingAck() { } bool SessionState::sendingSolicit() { + assert(state == ATTACHED); if (ackSolicited) return false; solicitAckAt = lastSent + ackInterval; return ackInterval != 0; } +SequenceNumber SessionState::resuming() { + if (!resumable) + throw InternalErrorException("Session is not resumable"); + state = RESUMING; + return sendingAck(); +} + +void SessionState::suspend() { + state = SUSPENDED; +} + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h index 5034de4e94..361c960db1 100644 --- a/cpp/src/qpid/framing/SessionState.h +++ b/cpp/src/qpid/framing/SessionState.h @@ -35,10 +35,7 @@ namespace framing { /** * Session state common to client and broker. - * - * Stores data needed to resume a session: replay frames, implements - * session ack/resume protcools. Stores handler chains for the session, - * handlers may themselves store state. + * Stores replay frames, implements session ack/resume protcools. * * A SessionState is always associated with an _open_ session (attached or * suspended) it is destroyed when the session is closed. @@ -49,6 +46,13 @@ class SessionState public: typedef std::vector<AMQFrame> Replay; + /** States of a session. */ + enum State { + SUSPENDED, ///< Suspended, detached from any channel. + RESUMING, ///< Resuming: waiting for initial ack from peer. + ATTACHED ///< Attached to channel and operating normally. + }; + /** *Create a newly opened active session. *@param ackInterval send/solicit an ack whenever N unacked frames @@ -56,8 +60,7 @@ class SessionState * * N=0 disables voluntary send/solict ack. */ - SessionState(uint32_t ackInterval, - const framing::Uuid& id=framing::Uuid(true)); + SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true)); /** * Create a non-resumable session. Does not store session frames, @@ -66,6 +69,7 @@ class SessionState SessionState(const framing::Uuid& id=framing::Uuid(true)); const framing::Uuid& getId() const { return id; } + State getState() const { return state; } /** Received incoming L3 frame. * @return SequenceNumber if an ack should be sent, empty otherwise. @@ -88,6 +92,13 @@ class SessionState */ Replay replay(); + /** Suspend the session. */ + void suspend(); + + /** Start resume protocol for the session. + *@returns sequence number to ack immediately. */ + SequenceNumber resuming(); + /** About to send an unscheduled ack, e.g. to respond to a solicit-ack. * * Note: when received() returns a sequence number this function @@ -104,7 +115,9 @@ class SessionState bool sendingSolicit(); + State state; framing::Uuid id; + Unacked unackedOut; SequenceNumber lastReceived; SequenceNumber lastSent; |
