summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/SessionState.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/SessionState.h')
-rw-r--r--cpp/src/qpid/SessionState.h144
1 files changed, 64 insertions, 80 deletions
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index 7957825dd3..47fabb04c8 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -28,6 +28,7 @@
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FrameHandler.h>
#include <boost/operators.hpp>
+#include <boost/range/iterator_range.hpp>
#include <vector>
#include <iosfwd>
@@ -70,135 +71,118 @@ std::ostream& operator<<(std::ostream&, const SessionPoint&);
* source-incompatbile API changes.
*/
class SessionState {
- public:
+ typedef std::vector<framing::AMQFrame> ReplayList;
- /** State for commands sent. Records commands for replay,
- * tracks confirmation and completion of sent commands.
- */
-class SendState {
public:
- typedef std::vector<framing::AMQFrame> ReplayList;
+
+ typedef boost::iterator_range<ReplayList::iterator> ReplayRange;
+
+ struct Configuration {
+ Configuration(size_t flush=0, size_t hard=0);
+ size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables.
+ size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables.
+ };
+
+ SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
+
+ virtual ~SessionState();
+
+ bool hasState() const;
+
+ const SessionId& getId() const { return id; }
+
+ uint32_t getTimeout() const { return timeout; }
+ void setTimeout(uint32_t seconds) { timeout = seconds; }
+
+ bool operator==(const SessionId& other) const { return id == other; }
+ bool operator==(const SessionState& other) const { return id == other.id; }
+
+ // ==== Functions for sender state.
/** Record frame f for replay. Should not be called during replay. */
- void record(const framing::AMQFrame& f);
+ virtual void senderRecord(const framing::AMQFrame& f);
/** @return true if we should send flush for confirmed and completed commands. */
- bool needFlush() const;
+ virtual bool senderNeedFlush() const;
/** Called when flush for confirmed and completed commands is sent to peer. */
- void recordFlush();
+ virtual void senderRecordFlush();
/** Called when the peer confirms up to comfirmed. */
- void confirmed(const SessionPoint& confirmed);
+ virtual void senderConfirmed(const SessionPoint& confirmed);
/** Called when the peer indicates commands completed */
- void completed(const SequenceSet& commands);
+ virtual void senderCompleted(const SequenceSet& commands);
- /** Point from which we can replay. All data < replayPoint is confirmed. */
- const SessionPoint& getReplayPoint() const { return replayPoint; }
-
- /** Get the replay list, starting from getReplayPoint() */
- // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&.
- ReplayList& getReplayList() { return replayList; }
-
- /** Point from which the next data will be sent. */
- const SessionPoint& getCommandPoint();
+ /** Point from which the next new (not replayed) data will be sent. */
+ virtual SessionPoint senderGetCommandPoint();
/** Set of outstanding incomplete commands */
- const SequenceSet& getIncomplete() const { return incomplete; }
+ virtual SequenceSet senderGetIncomplete() const;
+
+ /** Point from which we can replay. */
+ virtual SessionPoint senderGetReplayPoint() const;
/** Peer expecting commands from this point.
- *@return true if replay is required, sets replayPoint.
+ virtual *@return Range of frames to be replayed.
*/
- bool expected(const SessionPoint& expected);
+ virtual ReplayRange senderExpected(const SessionPoint& expected);
- private:
- SendState(SessionState& s);
- SessionState* session;
- // invariant: replayPoint <= flushPoint <= sendPoint
- SessionPoint replayPoint; // Can replay from this point
- SessionPoint flushPoint; // Point of last flush
- SessionPoint sendPoint; // Send from this point
- ReplayList replayList; // Starts from replayPoint.
- size_t unflushedSize; // Un-flushed bytes in replay list.
- SequenceSet incomplete; // Commands sent and not yet completed.
+ // ==== Functions for receiver state
- friend class SessionState;
-};
-
- /** State for commands received.
- * Idempotence barrier for duplicate commands, tracks completion
- * and of received commands.
- */
-class ReceiveState {
- public:
/** Set the command point. */
- void setCommandPoint(const SessionPoint& point);
+ virtual void receiverSetCommandPoint(const SessionPoint& point);
/** Returns true if frame should be be processed, false if it is a duplicate. */
- bool record(const framing::AMQFrame& f);
+ virtual bool receiverRecord(const framing::AMQFrame& f);
/** Command completed locally */
- void completed(SequenceNumber command, bool cumulative=false);
+ virtual void receiverCompleted(SequenceNumber command, bool cumulative=false);
/** Peer has indicated commands are known completed */
- void knownCompleted(const SequenceSet& commands);
+ virtual void receiverKnownCompleted(const SequenceSet& commands);
/** Get the incoming command point */
- const SessionPoint& getExpected() const { return expected; }
+ virtual const SessionPoint& receiverGetExpected() const;
/** Get the received high-water-mark, may be > getExpected() during replay */
- const SessionPoint& getReceived() const { return received; }
+ virtual const SessionPoint& receiverGetReceived() const;
/** Completed commands that the peer may not know about */
- const SequenceSet& getUnknownComplete() const { return unknownCompleted; }
+ virtual const SequenceSet& receiverGetUnknownComplete() const;
/** ID of the command currently being handled. */
- SequenceNumber getCurrent() const;
+ virtual SequenceNumber receiverGetCurrent() const;
private:
- ReceiveState(SessionState&);
+ struct SendState {
+ SendState() : session(), unflushedSize(), replaySize() {}
+ SessionState* session;
+ // invariant: replayPoint <= flushPoint <= sendPoint
+ SessionPoint replayPoint; // Can replay from this point
+ SessionPoint flushPoint; // Point of last flush
+ SessionPoint sendPoint; // Send from this point
+ ReplayList replayList; // Starts from replayPoint.
+ size_t unflushedSize; // Un-flushed bytes in replay list.
+ size_t replaySize; // Total bytes in replay list.
+ SequenceSet incomplete; // Commands sent and not yet completed.
+ } sender;
+
+ struct ReceiveState {
+ ReceiveState() : session() {}
SessionState* session;
SessionPoint expected; // Expected from here
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
SequenceNumber firstIncomplete; // First incomplete command.
+ } receiver;
- friend class SessionState;
- };
-
- struct Configuration {
- Configuration();
- size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes
- size_t replayKillSize; // Kill session if replay list grows beyond N bytes.
- };
-
- SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
-
- virtual ~SessionState();
-
- const SessionId& getId() const { return id; }
- uint32_t getTimeout() const { return timeout; }
- void setTimeout(uint32_t seconds) { timeout = seconds; }
-
- bool operator==(const SessionId& other) const { return id == other; }
- bool operator==(const SessionState& other) const { return id == other.id; }
-
- SendState sender; ///< State for commands sent
- ReceiveState receiver; ///< State for commands received
-
- bool hasState() const;
-
- private:
SessionId id;
uint32_t timeout;
Configuration config;
bool stateful;
-
- friend class SendState;
- friend class ReceiveState;
};
inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }