diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/SessionState.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 287 |
1 files changed, 0 insertions, 287 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp deleted file mode 100644 index e5019604d2..0000000000 --- a/cpp/src/qpid/SessionState.cpp +++ /dev/null @@ -1,287 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/SessionState.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/framing/enum.h" -#include "qpid/log/Statement.h" -#include <boost/bind.hpp> -#include <numeric> - -namespace qpid { -using framing::AMQFrame; -using framing::NotImplementedException; -using framing::InvalidArgumentException; -using framing::IllegalStateException; -using framing::ResourceLimitExceededException; -using framing::InternalErrorException; -using framing::FramingErrorException; - -namespace { -bool isControl(const AMQFrame& f) { - return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL; -} -bool isCommand(const AMQFrame& f) { - return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND; -} -} // namespace - -SessionPoint::SessionPoint(SequenceNumber c, uint64_t o) : command(c), offset(o) {} - -// TODO aconway 2008-05-22: Do complete frame sequence validity check here, -// currently duplicated betwen broker and client session impl. -// -void SessionPoint::advance(const AMQFrame& f) { - if (isControl(f)) return; // Ignore controls. - if (f.isFirstSegment() && f.isFirstFrame()) { - if (offset != 0) - throw FramingErrorException(QPID_MSG("Unexpected command start frame.")); - if (!isCommand(f)) - throw FramingErrorException( - QPID_MSG("Command start frame has invalid type" << f.getBody()->type())); - if (f.isLastSegment() && f.isLastFrame()) - ++command; // Single-frame command. - else - offset += f.encodedSize(); - } - else { // continuation frame for partial command - if (offset == 0) - throw FramingErrorException(QPID_MSG("Unexpected command continuation frame.")); - if (f.isLastSegment() && f.isLastFrame()) { - ++command; - offset = 0; - } - else { - // TODO aconway 2008-04-24: if we go to support for partial - // command replay, then it may be better to record the unframed - // data size in a command point rather than the framed size so - // that the relationship of fragment offsets to the replay - // list can be computed more easily. - // - offset += f.encodedSize(); - } - } -} - -bool SessionPoint::operator<(const SessionPoint& x) const { - return command < x.command || (command == x.command && offset < x.offset); -} - -bool SessionPoint::operator==(const SessionPoint& x) const { - return command == x.command && offset == x.offset; -} - - -SessionState::SendState::SendState() : unflushedSize(), replaySize(), bytesSinceKnownCompleted() {} - -SessionState::ReceiveState::ReceiveState() : bytesSinceKnownCompleted() {} - -uint32_t SessionState::getTimeout() const { return timeout; } -void SessionState::setTimeout(uint32_t seconds) { timeout = seconds; } - -SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; } -SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; } -SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; } - -SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expect) { - if (expect < sender.replayPoint || sender.sendPoint < expect) - throw InvalidArgumentException(QPID_MSG(getId() << ": expected command-point out of range.")); - QPID_LOG(debug, getId() << ": sender expected point moved to " << expect); - ReplayList::iterator i = sender.replayList.begin(); - SessionPoint p = sender.replayPoint; - while (i != sender.replayList.end() && p.command < expect.command) - p.advance(*i++); - assert(p.command == expect.command); - return boost::make_iterator_range(i, sender.replayList.end()); -} - -void SessionState::senderRecord(const AMQFrame& f) { - if (isControl(f)) return; // Ignore control frames. - QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getBody()); - - stateful = true; - if (timeout) sender.replayList.push_back(f); - sender.unflushedSize += f.encodedSize(); - sender.bytesSinceKnownCompleted += f.encodedSize(); - sender.replaySize += f.encodedSize(); - sender.incomplete += sender.sendPoint.command; - sender.sendPoint.advance(f); - if (config.replayHardLimit && config.replayHardLimit < sender.replaySize) - throw ResourceLimitExceededException("Replay buffer exceeeded hard limit"); -} - -static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536; - -bool SessionState::senderNeedFlush() const { - return (sender.sendPoint.command % SPONTANEOUS_REQUEST_INTERVAL == 0) || - (config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit); -} - -void SessionState::senderRecordFlush() { - sender.flushPoint = sender.sendPoint; - sender.unflushedSize = 0; -} - -bool SessionState::senderNeedKnownCompleted() const { - return config.replayFlushLimit && sender.bytesSinceKnownCompleted >= config.replayFlushLimit; -} - -void SessionState::senderRecordKnownCompleted() { - sender.bytesSinceKnownCompleted = 0; -} - -void SessionState::senderConfirmed(const SessionPoint& confirmed) { - if (confirmed > sender.sendPoint) - throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed < " << confirmed << " but only sent < " << sender.sendPoint)); - QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed); - ReplayList::iterator i = sender.replayList.begin(); - while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) { - sender.replayPoint.advance(*i); - assert(sender.replayPoint <= sender.sendPoint); - sender.replaySize -= i->encodedSize(); - if (sender.replayPoint > sender.flushPoint) - sender.unflushedSize -= i->encodedSize(); - ++i; - } - if (sender.replayPoint > sender.flushPoint) - sender.flushPoint = sender.replayPoint; - sender.replayList.erase(sender.replayList.begin(), i); - assert(sender.replayPoint.offset == 0); -} - -void SessionState::senderCompleted(const SequenceSet& commands) { - if (commands.empty()) return; - QPID_LOG(debug, getId() << ": sender marked completed: " << commands); - sender.incomplete -= commands; - // Completion implies confirmation but we don't handle out-of-order - // confirmation, so confirm up to the end of the first contiguous range of commands. - senderConfirmed(SessionPoint(commands.rangesBegin()->end())); -} - -void SessionState::receiverSetCommandPoint(const SessionPoint& point) { - if (hasState() && point > receiver.received) - throw InvalidArgumentException(QPID_MSG(getId() << ": Command-point out of range.")); - QPID_LOG(debug, getId() << ": receiver command-point set to: " << point); - receiver.expected = point; - if (receiver.expected > receiver.received) - receiver.received = receiver.expected; -} - -bool SessionState::receiverRecord(const AMQFrame& f) { - if (receiverTrackingDisabled) return true; //Very nasty hack for push bridges - if (isControl(f)) return true; // Ignore control frames. - stateful = true; - receiver.expected.advance(f); - receiver.bytesSinceKnownCompleted += f.encodedSize(); - bool firstTime = receiver.expected > receiver.received; - if (firstTime) { - receiver.received = receiver.expected; - receiver.incomplete += receiverGetCurrent(); - } - QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getBody()); - if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame."); - return firstTime; -} - -void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { - if (receiverTrackingDisabled) return; //Very nasty hack for push bridges - assert(receiver.incomplete.contains(command)); // Internal error to complete command twice. - SequenceNumber first =cumulative ? receiver.incomplete.front() : command; - SequenceNumber last = command; - receiver.unknownCompleted.add(first, last); - receiver.incomplete.remove(first, last); - QPID_LOG(debug, getId() << ": receiver marked completed: " << command - << " incomplete: " << receiver.incomplete - << " unknown-completed: " << receiver.unknownCompleted); -} - -void SessionState::receiverKnownCompleted(const SequenceSet& commands) { - if (!commands.empty() && commands.back() > receiver.received.command) - throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands.")); - receiver.bytesSinceKnownCompleted=0; - receiver.unknownCompleted -= commands; - QPID_LOG(debug, getId() << ": receiver known completed: " << commands << " unknown: " << receiver.unknownCompleted); -} - -bool SessionState::receiverNeedKnownCompleted() const { - return (receiver.expected.command % SPONTANEOUS_REQUEST_INTERVAL == 0) || - (config.replayFlushLimit && receiver.bytesSinceKnownCompleted >= config.replayFlushLimit); -} - -const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; } -const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; } -const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; } -const SequenceSet& SessionState::receiverGetIncomplete() const { return receiver.incomplete; } - -SequenceNumber SessionState::receiverGetCurrent() const { - SequenceNumber current = receiver.expected.command; - if (receiver.expected.offset == 0) - --current; - return current; -} - -SessionState::Configuration::Configuration(size_t flush, size_t hard) : - replayFlushLimit(flush), replayHardLimit(hard) {} - -SessionState::SessionState(const SessionId& i, const Configuration& c) - : id(i), timeout(0), config(c), stateful(false), receiverTrackingDisabled(false) -{ - QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); -} - -bool SessionState::hasState() const { return stateful; } - -SessionState::~SessionState() {} - -std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { - return o << "(" << p.command.getValue() << "+" << p.offset << ")"; -} - -void SessionState::setState( - const SequenceNumber& replayStart, - const SequenceNumber& sendCommandPoint, - const SequenceSet& sentIncomplete, - const SequenceNumber& expected, - const SequenceNumber& received, - const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete -) -{ - sender.replayPoint = replayStart; - sender.flushPoint = sendCommandPoint; - sender.sendPoint = sendCommandPoint; - sender.unflushedSize = 0; - sender.replaySize = 0; // Replay list will be updated separately. - sender.incomplete = sentIncomplete; - sender.bytesSinceKnownCompleted = 0; - - receiver.expected = expected; - receiver.received = received; - receiver.unknownCompleted = unknownCompleted; - receiver.incomplete = receivedIncomplete; - receiver.bytesSinceKnownCompleted = 0; -} - -void SessionState::disableReceiverTracking() { receiverTrackingDisabled = true; } -void SessionState::enableReceiverTracking() { receiverTrackingDisabled = false; } - -} // namespace qpid |