summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-04-27 18:32:26 +0000
committerAlan Conway <aconway@apache.org>2008-04-27 18:32:26 +0000
commit4ef7f45b5fc6eea2b9d29245f9c27f58468b8bb2 (patch)
tree2e94c0c863d2b87c4282c729f07a265873d347cf /cpp/src/qpid
parent958d756944ca7f265d41528e5d7c6b8a25b2cf75 (diff)
downloadqpid-python-4ef7f45b5fc6eea2b9d29245f9c27f58468b8bb2.tar.gz
Session state as per AMQP 0-10 specification.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651997 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/SessionState.cpp165
-rw-r--r--cpp/src/qpid/SessionState.h188
-rw-r--r--cpp/src/qpid/framing/SequenceSet.h2
3 files changed, 355 insertions, 0 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
new file mode 100644
index 0000000000..64fdd17b8f
--- /dev/null
+++ b/cpp/src/qpid/SessionState.cpp
@@ -0,0 +1,165 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// FIXME aconway 2008-04-24: Reminders for handler implementation.
+//
+// - execution.sync results must be communicated to SessionState::peerConfirmed.
+//
+//
+
+#include "SessionState.h"
+#include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include <boost/bind.hpp>
+#include <numeric>
+
+namespace qpid {
+using framing::AMQFrame;
+using amqp_0_10::NotImplementedException;
+
+/** A point in the session - command id + offset */
+void SessionPoint::advance(const AMQFrame& f) {
+ if (f.isLastSegment() && f.isLastFrame()) {
+ ++command;
+ offset = 0;
+ }
+ else {
+ // TODO aconway 2008-04-24: if we go to support for partial
+ // command replay, then it may be better to record the unframed
+ // data size in a command point rather than the framed size so
+ // that the relationship of fragment offsets to the replay
+ // list can be computed more easily.
+ //
+ offset += f.size();
+ }
+}
+
+bool SessionPoint::operator<(const SessionPoint& x) const {
+ return command < x.command || (command == x.command && offset < x.offset);
+}
+
+bool SessionPoint::operator==(const SessionPoint& x) const {
+ return command == x.command && offset == x.offset;
+}
+
+SendState::SendState(size_t syncSize, size_t killSize)
+ : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {}
+
+void SendState::send(const AMQFrame& f) {
+ if (f.getMethod() && f.getMethod()->type() == 0)
+ return; // Don't replay control frames.
+ replayList.push_back(f);
+ unflushedSize += f.size();
+ sendPoint.advance(f);
+}
+
+bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; }
+
+void SendState::sendFlush() {
+ assert(flushPoint <= sendPoint);
+ flushPoint = sendPoint;
+ unflushedSize = 0;
+}
+
+void SendState::peerConfirmed(const SessionPoint& confirmed) {
+ ReplayList::iterator i = replayList.begin();
+ // Ignore peerConfirmed.offset, we don't support partial replay.
+ while (i != replayList.end() && replayPoint.command < confirmed.command) {
+ assert(replayPoint <= flushPoint);
+ replayPoint.advance(*i);
+ assert(replayPoint <= sendPoint);
+ if (replayPoint > flushPoint) {
+ flushPoint.advance(*i);
+ assert(replayPoint <= flushPoint);
+ unflushedSize -= i->size();
+ }
+ ++i;
+ }
+ replayList.erase(replayList.begin(), i);
+ assert(replayPoint.offset == 0);
+}
+
+void SendState::peerCompleted(const SequenceSet& commands) {
+ if (commands.empty()) return;
+ sentCompleted += commands;
+ // Completion implies confirmation but we don't handle out-of-order
+ // confirmation, so confirm only the first contiguous range of commands.
+ peerConfirmed(SessionPoint(commands.rangesBegin()->end()));
+}
+
+bool ReceiveState::hasState() { return stateful; }
+
+void ReceiveState::setExpecting(const SessionPoint& point) {
+ if (!hasState()) // initializing a new session.
+ expecting = received = point;
+ else { // setting point in an existing session.
+ if (point > received)
+ throw NotImplementedException("command-point out of bounds.");
+ expecting = point;
+ }
+}
+
+ReceiveState::ReceiveState() : stateful() {}
+
+bool ReceiveState::receive(const AMQFrame& f) {
+ stateful = true;
+ expecting.advance(f);
+ if (expecting > received) {
+ received = expecting;
+ return true;
+ }
+ return false;
+}
+
+void ReceiveState::localCompleted(SequenceNumber command) {
+ assert(command < received.command); // Can't complete what we haven't received.
+ receivedCompleted += command;
+}
+
+void ReceiveState::peerKnownComplete(const SequenceSet& commands) {
+ receivedCompleted -= commands;
+}
+
+SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {}
+
+bool SessionId::operator<(const SessionId& id) const {
+ return userId < id.userId || (userId == id.userId && name < id.name);
+}
+
+bool SessionId::operator==(const SessionId& id) const {
+ return id.name == name && id.userId == userId;
+}
+
+SessionState::Configuration::Configuration()
+ : replaySyncSize(std::numeric_limits<size_t>::max()),
+ replayKillSize(std::numeric_limits<size_t>::max()) {}
+
+SessionState::SessionState(const SessionId& i, const Configuration& c)
+ : SendState(c.replaySyncSize, c.replayKillSize),
+ id(i), timeout(), config(c) {}
+
+void SessionState::clear() { *this = SessionState(id, config); }
+
+std::ostream& operator<<(std::ostream& o, const SessionPoint& p) {
+ return o << "(" << p.command.getValue() << "+" << p.offset << ")";
+}
+
+} // namespace qpid
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
new file mode 100644
index 0000000000..b836534ee7
--- /dev/null
+++ b/cpp/src/qpid/SessionState.h
@@ -0,0 +1,188 @@
+#ifndef QPID_SESSIONSTATE_H
+#define QPID_SESSIONSTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/framing/SequenceNumber.h>
+#include <qpid/framing/SequenceSet.h>
+#include <qpid/framing/AMQFrame.h>
+#include <boost/operators.hpp>
+#include <vector>
+#include <iosfwd>
+
+namespace qpid {
+using framing::SequenceNumber;
+using framing::SequenceSet;
+
+/** A point in the session. Points to command id + offset */
+struct SessionPoint : boost::totally_ordered1<SessionPoint> {
+ SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : command(command_), offset(offset_) {}
+
+ SequenceNumber command;
+ uint64_t offset;
+
+ /** Advance past frame f */
+ void advance(const framing::AMQFrame& f);
+
+ bool operator<(const SessionPoint&) const;
+ bool operator==(const SessionPoint&) const;
+};
+
+std::ostream& operator<<(std::ostream&, const SessionPoint&);
+
+/** The sending half of session state */
+class SendState {
+ public:
+ typedef std::vector<framing::AMQFrame> ReplayList;
+
+ /** Record frame f for replay. Should not be called during replay. */
+ void send(const framing::AMQFrame& f);
+
+ /** @return true if we should send flush for confirmed and completed commands. */
+ bool needFlush() const;
+
+ /** Called when flush for confirmed and completed commands is sent to peer. */
+ void sendFlush();
+
+ /** Called when the peer confirms up to commands. */
+ void peerConfirmed(const SessionPoint& confirmed);
+
+ /** Called when the peer indicates commands completed */
+ void peerCompleted(const SequenceSet& commands);
+
+ /** Get the replay list. @see getReplayPoint. */
+ const ReplayList& getReplayList() const { return replayList; }
+
+ /**
+ * The replay point is the point up to which all data has been
+ * confirmed. Partial replay is not supported, it will always
+ * have offset==0.
+ */
+ const SessionPoint& getReplayPoint() const { return replayPoint; }
+
+ const SessionPoint& getSendPoint() const { return sendPoint; }
+ const SequenceSet& getCompleted() const { return sentCompleted; }
+
+ protected:
+ SendState(size_t replaySyncSize, size_t replayKillSize);
+
+ private:
+ size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration.
+ // invariant: replayPoint <= flushPoint <= sendPoint
+ SessionPoint replayPoint; // Can replay from this point
+ SessionPoint sendPoint; // Send from this point
+ SessionPoint flushPoint; // Point of last flush
+ ReplayList replayList; // Starts from replayPoint.
+ size_t unflushedSize; // Un-flushed bytes in replay list.
+ SequenceSet sentCompleted; // Commands sent and acknowledged as completed.
+};
+
+/** Receiving half of SessionState */
+class ReceiveState {
+ public:
+ bool hasState();
+
+ /** Set the command point. */
+ void setExpecting(const SessionPoint& point);
+
+ /** Returns true if frame should be be processed, false if it is a duplicate. */
+ bool receive(const framing::AMQFrame& f);
+
+ /** Command completed locally */
+ void localCompleted(SequenceNumber command);
+
+ /** Peer has indicated commands are known completed */
+ void peerKnownComplete(const SequenceSet& commands);
+
+ /** Recieved, completed and possibly not known by peer to be completed */
+ const SequenceSet& getReceivedCompleted() const { return receivedCompleted; }
+ const SessionPoint& getExpecting() const { return expecting; }
+ const SessionPoint& getReceived() const { return received; }
+
+ protected:
+ ReceiveState();
+
+ private:
+ bool stateful; // True if session has state.
+ SessionPoint expecting; // Expecting from here
+ SessionPoint received; // Received to here. Invariant: expecting <= received.
+ SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer
+};
+
+/** Identifier for a session */
+class SessionId : boost::totally_ordered1<SessionId> {
+ std::string userId;
+ std::string name;
+ public:
+ SessionId(const std::string& userId=std::string(), const std::string& name=std::string());
+ std::string getUserId() const { return userId; }
+ std::string getName() const { return name; }
+ bool operator<(const SessionId&) const ;
+ bool operator==(const SessionId& id) const;
+};
+
+
+/**
+ * Support for session idempotence barrier and resume as defined in
+ * AMQP 0-10.
+ *
+ * We only issue/use contiguous confirmations, out-of-order confirmation
+ * is ignored. Out of order completion is fully supported.
+ *
+ * Raises NotImplemented if the command point is set greater than the
+ * max currently received command data, either explicitly via
+ * session.command-point or implicitly via session.gap.
+ *
+ * Partial replay is not supported, replay always begins on a command
+ * boundary, and we never confirm partial commands.
+ *
+ * The SessionPoint data structure does store offsets so this class
+ * could be extended to support partial replay without
+ * source-incompatbile API changes.
+ */
+class SessionState : public SendState, public ReceiveState {
+ public:
+ struct Configuration {
+ Configuration();
+ size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes
+ size_t replayKillSize; // Kill session if replay list grows beyond N bytes.
+ };
+
+ SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
+
+ const SessionId& getId() const { return id; }
+ uint32_t getTimeout() const { return timeout; }
+ void setTimeout(uint32_t seconds) { timeout = seconds; }
+
+ /** Clear all state except Id. */
+ void clear();
+
+ private:
+ SessionId id;
+ uint32_t timeout;
+ Configuration config;
+};
+
+} // namespace qpid
+
+
+#endif /*!QPID_SESSIONSTATE_H*/
diff --git a/cpp/src/qpid/framing/SequenceSet.h b/cpp/src/qpid/framing/SequenceSet.h
index f934bb40bb..029a26818e 100644
--- a/cpp/src/qpid/framing/SequenceSet.h
+++ b/cpp/src/qpid/framing/SequenceSet.h
@@ -31,6 +31,8 @@ class Buffer;
class SequenceSet : public RangeSet<SequenceNumber> {
public:
SequenceSet() {}
+ explicit SequenceSet(const RangeSet<SequenceNumber>& r)
+ : RangeSet<SequenceNumber>(r) {}
explicit SequenceSet(const SequenceNumber& s) { add(s); }
void encode(Buffer& buffer) const;