summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/framing')
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp21
-rw-r--r--cpp/src/qpid/framing/SessionState.h25
2 files changed, 40 insertions, 6 deletions
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
index 7e905bdf63..8056f4a523 100644
--- a/cpp/src/qpid/framing/SessionState.cpp
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -33,6 +33,7 @@ namespace qpid {
namespace framing {
SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
+ state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -44,6 +45,7 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
{}
SessionState::SessionState(const Uuid& uuid) :
+ state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -63,6 +65,10 @@ bool isSessionCommand(const AMQFrame& f) {
boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
if (isSessionCommand(f))
return boost::none;
+ if (state==RESUMING)
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
+ assert(state = ATTACHED);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
if (ackInterval && lastReceived == sendAckAt)
@@ -79,6 +85,7 @@ bool SessionState::sent(const AMQFrame& f) {
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
return ackInterval &&
+ (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -90,6 +97,8 @@ SessionState::Replay SessionState::replay() {
}
void SessionState::receivedAck(SequenceNumber acked) {
+ if (state==RESUMING) state=ATTACHED;
+ assert(state==ATTACHED);
if (lastSent < acked)
throw InvalidArgumentException("Invalid sequence number in ack");
size_t keep = lastSent - acked;
@@ -104,10 +113,22 @@ SequenceNumber SessionState::sendingAck() {
}
bool SessionState::sendingSolicit() {
+ assert(state == ATTACHED);
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
return ackInterval != 0;
}
+SequenceNumber SessionState::resuming() {
+ if (!resumable)
+ throw InternalErrorException("Session is not resumable");
+ state = RESUMING;
+ return sendingAck();
+}
+
+void SessionState::suspend() {
+ state = SUSPENDED;
+}
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h
index 5034de4e94..361c960db1 100644
--- a/cpp/src/qpid/framing/SessionState.h
+++ b/cpp/src/qpid/framing/SessionState.h
@@ -35,10 +35,7 @@ namespace framing {
/**
* Session state common to client and broker.
- *
- * Stores data needed to resume a session: replay frames, implements
- * session ack/resume protcools. Stores handler chains for the session,
- * handlers may themselves store state.
+ * Stores replay frames, implements session ack/resume protcools.
*
* A SessionState is always associated with an _open_ session (attached or
* suspended) it is destroyed when the session is closed.
@@ -49,6 +46,13 @@ class SessionState
public:
typedef std::vector<AMQFrame> Replay;
+ /** States of a session. */
+ enum State {
+ SUSPENDED, ///< Suspended, detached from any channel.
+ RESUMING, ///< Resuming: waiting for initial ack from peer.
+ ATTACHED ///< Attached to channel and operating normally.
+ };
+
/**
*Create a newly opened active session.
*@param ackInterval send/solicit an ack whenever N unacked frames
@@ -56,8 +60,7 @@ class SessionState
*
* N=0 disables voluntary send/solict ack.
*/
- SessionState(uint32_t ackInterval,
- const framing::Uuid& id=framing::Uuid(true));
+ SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true));
/**
* Create a non-resumable session. Does not store session frames,
@@ -66,6 +69,7 @@ class SessionState
SessionState(const framing::Uuid& id=framing::Uuid(true));
const framing::Uuid& getId() const { return id; }
+ State getState() const { return state; }
/** Received incoming L3 frame.
* @return SequenceNumber if an ack should be sent, empty otherwise.
@@ -88,6 +92,13 @@ class SessionState
*/
Replay replay();
+ /** Suspend the session. */
+ void suspend();
+
+ /** Start resume protocol for the session.
+ *@returns sequence number to ack immediately. */
+ SequenceNumber resuming();
+
/** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
*
* Note: when received() returns a sequence number this function
@@ -104,7 +115,9 @@ class SessionState
bool sendingSolicit();
+ State state;
framing::Uuid id;
+
Unacked unackedOut;
SequenceNumber lastReceived;
SequenceNumber lastSent;