diff options
| author | Alan Conway <aconway@apache.org> | 2008-04-27 18:32:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-04-27 18:32:26 +0000 |
| commit | 4ef7f45b5fc6eea2b9d29245f9c27f58468b8bb2 (patch) | |
| tree | 2e94c0c863d2b87c4282c729f07a265873d347cf /cpp/src/qpid | |
| parent | 958d756944ca7f265d41528e5d7c6b8a25b2cf75 (diff) | |
| download | qpid-python-4ef7f45b5fc6eea2b9d29245f9c27f58468b8bb2.tar.gz | |
Session state as per AMQP 0-10 specification.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651997 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/SessionState.cpp | 165 | ||||
| -rw-r--r-- | cpp/src/qpid/SessionState.h | 188 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/SequenceSet.h | 2 |
3 files changed, 355 insertions, 0 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp new file mode 100644 index 0000000000..64fdd17b8f --- /dev/null +++ b/cpp/src/qpid/SessionState.cpp @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// FIXME aconway 2008-04-24: Reminders for handler implementation. +// +// - execution.sync results must be communicated to SessionState::peerConfirmed. +// +// + +#include "SessionState.h" +#include "qpid/amqp_0_10/exceptions.h" +#include "qpid/framing/AMQMethodBody.h" +#include <boost/bind.hpp> +#include <numeric> + +namespace qpid { +using framing::AMQFrame; +using amqp_0_10::NotImplementedException; + +/** A point in the session - command id + offset */ +void SessionPoint::advance(const AMQFrame& f) { + if (f.isLastSegment() && f.isLastFrame()) { + ++command; + offset = 0; + } + else { + // TODO aconway 2008-04-24: if we go to support for partial + // command replay, then it may be better to record the unframed + // data size in a command point rather than the framed size so + // that the relationship of fragment offsets to the replay + // list can be computed more easily. + // + offset += f.size(); + } +} + +bool SessionPoint::operator<(const SessionPoint& x) const { + return command < x.command || (command == x.command && offset < x.offset); +} + +bool SessionPoint::operator==(const SessionPoint& x) const { + return command == x.command && offset == x.offset; +} + +SendState::SendState(size_t syncSize, size_t killSize) + : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {} + +void SendState::send(const AMQFrame& f) { + if (f.getMethod() && f.getMethod()->type() == 0) + return; // Don't replay control frames. + replayList.push_back(f); + unflushedSize += f.size(); + sendPoint.advance(f); +} + +bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; } + +void SendState::sendFlush() { + assert(flushPoint <= sendPoint); + flushPoint = sendPoint; + unflushedSize = 0; +} + +void SendState::peerConfirmed(const SessionPoint& confirmed) { + ReplayList::iterator i = replayList.begin(); + // Ignore peerConfirmed.offset, we don't support partial replay. + while (i != replayList.end() && replayPoint.command < confirmed.command) { + assert(replayPoint <= flushPoint); + replayPoint.advance(*i); + assert(replayPoint <= sendPoint); + if (replayPoint > flushPoint) { + flushPoint.advance(*i); + assert(replayPoint <= flushPoint); + unflushedSize -= i->size(); + } + ++i; + } + replayList.erase(replayList.begin(), i); + assert(replayPoint.offset == 0); +} + +void SendState::peerCompleted(const SequenceSet& commands) { + if (commands.empty()) return; + sentCompleted += commands; + // Completion implies confirmation but we don't handle out-of-order + // confirmation, so confirm only the first contiguous range of commands. + peerConfirmed(SessionPoint(commands.rangesBegin()->end())); +} + +bool ReceiveState::hasState() { return stateful; } + +void ReceiveState::setExpecting(const SessionPoint& point) { + if (!hasState()) // initializing a new session. + expecting = received = point; + else { // setting point in an existing session. + if (point > received) + throw NotImplementedException("command-point out of bounds."); + expecting = point; + } +} + +ReceiveState::ReceiveState() : stateful() {} + +bool ReceiveState::receive(const AMQFrame& f) { + stateful = true; + expecting.advance(f); + if (expecting > received) { + received = expecting; + return true; + } + return false; +} + +void ReceiveState::localCompleted(SequenceNumber command) { + assert(command < received.command); // Can't complete what we haven't received. + receivedCompleted += command; +} + +void ReceiveState::peerKnownComplete(const SequenceSet& commands) { + receivedCompleted -= commands; +} + +SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {} + +bool SessionId::operator<(const SessionId& id) const { + return userId < id.userId || (userId == id.userId && name < id.name); +} + +bool SessionId::operator==(const SessionId& id) const { + return id.name == name && id.userId == userId; +} + +SessionState::Configuration::Configuration() + : replaySyncSize(std::numeric_limits<size_t>::max()), + replayKillSize(std::numeric_limits<size_t>::max()) {} + +SessionState::SessionState(const SessionId& i, const Configuration& c) + : SendState(c.replaySyncSize, c.replayKillSize), + id(i), timeout(), config(c) {} + +void SessionState::clear() { *this = SessionState(id, config); } + +std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { + return o << "(" << p.command.getValue() << "+" << p.offset << ")"; +} + +} // namespace qpid diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h new file mode 100644 index 0000000000..b836534ee7 --- /dev/null +++ b/cpp/src/qpid/SessionState.h @@ -0,0 +1,188 @@ +#ifndef QPID_SESSIONSTATE_H +#define QPID_SESSIONSTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/framing/SequenceNumber.h> +#include <qpid/framing/SequenceSet.h> +#include <qpid/framing/AMQFrame.h> +#include <boost/operators.hpp> +#include <vector> +#include <iosfwd> + +namespace qpid { +using framing::SequenceNumber; +using framing::SequenceSet; + +/** A point in the session. Points to command id + offset */ +struct SessionPoint : boost::totally_ordered1<SessionPoint> { + SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : command(command_), offset(offset_) {} + + SequenceNumber command; + uint64_t offset; + + /** Advance past frame f */ + void advance(const framing::AMQFrame& f); + + bool operator<(const SessionPoint&) const; + bool operator==(const SessionPoint&) const; +}; + +std::ostream& operator<<(std::ostream&, const SessionPoint&); + +/** The sending half of session state */ +class SendState { + public: + typedef std::vector<framing::AMQFrame> ReplayList; + + /** Record frame f for replay. Should not be called during replay. */ + void send(const framing::AMQFrame& f); + + /** @return true if we should send flush for confirmed and completed commands. */ + bool needFlush() const; + + /** Called when flush for confirmed and completed commands is sent to peer. */ + void sendFlush(); + + /** Called when the peer confirms up to commands. */ + void peerConfirmed(const SessionPoint& confirmed); + + /** Called when the peer indicates commands completed */ + void peerCompleted(const SequenceSet& commands); + + /** Get the replay list. @see getReplayPoint. */ + const ReplayList& getReplayList() const { return replayList; } + + /** + * The replay point is the point up to which all data has been + * confirmed. Partial replay is not supported, it will always + * have offset==0. + */ + const SessionPoint& getReplayPoint() const { return replayPoint; } + + const SessionPoint& getSendPoint() const { return sendPoint; } + const SequenceSet& getCompleted() const { return sentCompleted; } + + protected: + SendState(size_t replaySyncSize, size_t replayKillSize); + + private: + size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration. + // invariant: replayPoint <= flushPoint <= sendPoint + SessionPoint replayPoint; // Can replay from this point + SessionPoint sendPoint; // Send from this point + SessionPoint flushPoint; // Point of last flush + ReplayList replayList; // Starts from replayPoint. + size_t unflushedSize; // Un-flushed bytes in replay list. + SequenceSet sentCompleted; // Commands sent and acknowledged as completed. +}; + +/** Receiving half of SessionState */ +class ReceiveState { + public: + bool hasState(); + + /** Set the command point. */ + void setExpecting(const SessionPoint& point); + + /** Returns true if frame should be be processed, false if it is a duplicate. */ + bool receive(const framing::AMQFrame& f); + + /** Command completed locally */ + void localCompleted(SequenceNumber command); + + /** Peer has indicated commands are known completed */ + void peerKnownComplete(const SequenceSet& commands); + + /** Recieved, completed and possibly not known by peer to be completed */ + const SequenceSet& getReceivedCompleted() const { return receivedCompleted; } + const SessionPoint& getExpecting() const { return expecting; } + const SessionPoint& getReceived() const { return received; } + + protected: + ReceiveState(); + + private: + bool stateful; // True if session has state. + SessionPoint expecting; // Expecting from here + SessionPoint received; // Received to here. Invariant: expecting <= received. + SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer +}; + +/** Identifier for a session */ +class SessionId : boost::totally_ordered1<SessionId> { + std::string userId; + std::string name; + public: + SessionId(const std::string& userId=std::string(), const std::string& name=std::string()); + std::string getUserId() const { return userId; } + std::string getName() const { return name; } + bool operator<(const SessionId&) const ; + bool operator==(const SessionId& id) const; +}; + + +/** + * Support for session idempotence barrier and resume as defined in + * AMQP 0-10. + * + * We only issue/use contiguous confirmations, out-of-order confirmation + * is ignored. Out of order completion is fully supported. + * + * Raises NotImplemented if the command point is set greater than the + * max currently received command data, either explicitly via + * session.command-point or implicitly via session.gap. + * + * Partial replay is not supported, replay always begins on a command + * boundary, and we never confirm partial commands. + * + * The SessionPoint data structure does store offsets so this class + * could be extended to support partial replay without + * source-incompatbile API changes. + */ +class SessionState : public SendState, public ReceiveState { + public: + struct Configuration { + Configuration(); + size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes + size_t replayKillSize; // Kill session if replay list grows beyond N bytes. + }; + + SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); + + const SessionId& getId() const { return id; } + uint32_t getTimeout() const { return timeout; } + void setTimeout(uint32_t seconds) { timeout = seconds; } + + /** Clear all state except Id. */ + void clear(); + + private: + SessionId id; + uint32_t timeout; + Configuration config; +}; + +} // namespace qpid + + +#endif /*!QPID_SESSIONSTATE_H*/ diff --git a/cpp/src/qpid/framing/SequenceSet.h b/cpp/src/qpid/framing/SequenceSet.h index f934bb40bb..029a26818e 100644 --- a/cpp/src/qpid/framing/SequenceSet.h +++ b/cpp/src/qpid/framing/SequenceSet.h @@ -31,6 +31,8 @@ class Buffer; class SequenceSet : public RangeSet<SequenceNumber> { public: SequenceSet() {} + explicit SequenceSet(const RangeSet<SequenceNumber>& r) + : RangeSet<SequenceNumber>(r) {} explicit SequenceSet(const SequenceNumber& s) { add(s); } void encode(Buffer& buffer) const; |
