diff options
Diffstat (limited to 'cpp/src/qpid/SessionState.h')
-rw-r--r-- | cpp/src/qpid/SessionState.h | 144 |
1 files changed, 64 insertions, 80 deletions
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index 7957825dd3..47fabb04c8 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -28,6 +28,7 @@ #include <qpid/framing/AMQFrame.h> #include <qpid/framing/FrameHandler.h> #include <boost/operators.hpp> +#include <boost/range/iterator_range.hpp> #include <vector> #include <iosfwd> @@ -70,135 +71,118 @@ std::ostream& operator<<(std::ostream&, const SessionPoint&); * source-incompatbile API changes. */ class SessionState { - public: + typedef std::vector<framing::AMQFrame> ReplayList; - /** State for commands sent. Records commands for replay, - * tracks confirmation and completion of sent commands. - */ -class SendState { public: - typedef std::vector<framing::AMQFrame> ReplayList; + + typedef boost::iterator_range<ReplayList::iterator> ReplayRange; + + struct Configuration { + Configuration(size_t flush=0, size_t hard=0); + size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables. + size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables. + }; + + SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); + + virtual ~SessionState(); + + bool hasState() const; + + const SessionId& getId() const { return id; } + + uint32_t getTimeout() const { return timeout; } + void setTimeout(uint32_t seconds) { timeout = seconds; } + + bool operator==(const SessionId& other) const { return id == other; } + bool operator==(const SessionState& other) const { return id == other.id; } + + // ==== Functions for sender state. /** Record frame f for replay. Should not be called during replay. */ - void record(const framing::AMQFrame& f); + virtual void senderRecord(const framing::AMQFrame& f); /** @return true if we should send flush for confirmed and completed commands. */ - bool needFlush() const; + virtual bool senderNeedFlush() const; /** Called when flush for confirmed and completed commands is sent to peer. */ - void recordFlush(); + virtual void senderRecordFlush(); /** Called when the peer confirms up to comfirmed. */ - void confirmed(const SessionPoint& confirmed); + virtual void senderConfirmed(const SessionPoint& confirmed); /** Called when the peer indicates commands completed */ - void completed(const SequenceSet& commands); + virtual void senderCompleted(const SequenceSet& commands); - /** Point from which we can replay. All data < replayPoint is confirmed. */ - const SessionPoint& getReplayPoint() const { return replayPoint; } - - /** Get the replay list, starting from getReplayPoint() */ - // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&. - ReplayList& getReplayList() { return replayList; } - - /** Point from which the next data will be sent. */ - const SessionPoint& getCommandPoint(); + /** Point from which the next new (not replayed) data will be sent. */ + virtual SessionPoint senderGetCommandPoint(); /** Set of outstanding incomplete commands */ - const SequenceSet& getIncomplete() const { return incomplete; } + virtual SequenceSet senderGetIncomplete() const; + + /** Point from which we can replay. */ + virtual SessionPoint senderGetReplayPoint() const; /** Peer expecting commands from this point. - *@return true if replay is required, sets replayPoint. + virtual *@return Range of frames to be replayed. */ - bool expected(const SessionPoint& expected); + virtual ReplayRange senderExpected(const SessionPoint& expected); - private: - SendState(SessionState& s); - SessionState* session; - // invariant: replayPoint <= flushPoint <= sendPoint - SessionPoint replayPoint; // Can replay from this point - SessionPoint flushPoint; // Point of last flush - SessionPoint sendPoint; // Send from this point - ReplayList replayList; // Starts from replayPoint. - size_t unflushedSize; // Un-flushed bytes in replay list. - SequenceSet incomplete; // Commands sent and not yet completed. + // ==== Functions for receiver state - friend class SessionState; -}; - - /** State for commands received. - * Idempotence barrier for duplicate commands, tracks completion - * and of received commands. - */ -class ReceiveState { - public: /** Set the command point. */ - void setCommandPoint(const SessionPoint& point); + virtual void receiverSetCommandPoint(const SessionPoint& point); /** Returns true if frame should be be processed, false if it is a duplicate. */ - bool record(const framing::AMQFrame& f); + virtual bool receiverRecord(const framing::AMQFrame& f); /** Command completed locally */ - void completed(SequenceNumber command, bool cumulative=false); + virtual void receiverCompleted(SequenceNumber command, bool cumulative=false); /** Peer has indicated commands are known completed */ - void knownCompleted(const SequenceSet& commands); + virtual void receiverKnownCompleted(const SequenceSet& commands); /** Get the incoming command point */ - const SessionPoint& getExpected() const { return expected; } + virtual const SessionPoint& receiverGetExpected() const; /** Get the received high-water-mark, may be > getExpected() during replay */ - const SessionPoint& getReceived() const { return received; } + virtual const SessionPoint& receiverGetReceived() const; /** Completed commands that the peer may not know about */ - const SequenceSet& getUnknownComplete() const { return unknownCompleted; } + virtual const SequenceSet& receiverGetUnknownComplete() const; /** ID of the command currently being handled. */ - SequenceNumber getCurrent() const; + virtual SequenceNumber receiverGetCurrent() const; private: - ReceiveState(SessionState&); + struct SendState { + SendState() : session(), unflushedSize(), replaySize() {} + SessionState* session; + // invariant: replayPoint <= flushPoint <= sendPoint + SessionPoint replayPoint; // Can replay from this point + SessionPoint flushPoint; // Point of last flush + SessionPoint sendPoint; // Send from this point + ReplayList replayList; // Starts from replayPoint. + size_t unflushedSize; // Un-flushed bytes in replay list. + size_t replaySize; // Total bytes in replay list. + SequenceSet incomplete; // Commands sent and not yet completed. + } sender; + + struct ReceiveState { + ReceiveState() : session() {} SessionState* session; SessionPoint expected; // Expected from here SessionPoint received; // Received to here. Invariant: expected <= received. SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. SequenceNumber firstIncomplete; // First incomplete command. + } receiver; - friend class SessionState; - }; - - struct Configuration { - Configuration(); - size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes - size_t replayKillSize; // Kill session if replay list grows beyond N bytes. - }; - - SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); - - virtual ~SessionState(); - - const SessionId& getId() const { return id; } - uint32_t getTimeout() const { return timeout; } - void setTimeout(uint32_t seconds) { timeout = seconds; } - - bool operator==(const SessionId& other) const { return id == other; } - bool operator==(const SessionState& other) const { return id == other.id; } - - SendState sender; ///< State for commands sent - ReceiveState receiver; ///< State for commands received - - bool hasState() const; - - private: SessionId id; uint32_t timeout; Configuration config; bool stateful; - - friend class SendState; - friend class ReceiveState; }; inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } |