diff options
author | Alan Conway <aconway@apache.org> | 2008-05-23 13:39:07 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-23 13:39:07 +0000 |
commit | 896d94f7c27e958806b96b537a7a96208ede145a (patch) | |
tree | 35c139fcc037fde8fef675d9d730882d22781931 /cpp/src | |
parent | bb4584d228b837fa70839560d72bc2a59dc1aa17 (diff) | |
download | qpid-python-896d94f7c27e958806b96b537a7a96208ede145a.tar.gz |
qpid::SessionState: Added error checking for invalid frame sequences.
client: Fix client crash on error during connection shutdown.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 66 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/SessionState.cpp | 11 |
10 files changed, 86 insertions, 46 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 96f2cca726..28e433b911 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -20,7 +20,7 @@ */ #include "SessionState.h" -#include "qpid/amqp_0_10/exceptions.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -28,32 +28,56 @@ namespace qpid { using framing::AMQFrame; -using amqp_0_10::NotImplementedException; -using amqp_0_10::InvalidArgumentException; -using amqp_0_10::IllegalStateException; -using amqp_0_10::ResourceLimitExceededException; -using amqp_0_10::InternalErrorException; +using framing::NotImplementedException; +using framing::InvalidArgumentException; +using framing::IllegalStateException; +using framing::ResourceLimitExceededException; +using framing::InternalErrorException; +using framing::FramingErrorException; namespace { bool isControl(const AMQFrame& f) { - return f.getMethod() && f.getMethod()->type() == 0; + return f.getMethod() && f.getMethod()->type() == framing::CONTROL; +} +bool isCommand(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->type() == framing::COMMAND; } } // namespace -/** A point in the session - command id + offset */ +SessionPoint::SessionPoint(SequenceNumber c, uint64_t o) : command(c), offset(o) {} + +// TODO aconway 2008-05-22: Do complete frame sequence validity check here, +// currently duplicated betwen broker and client session impl. +// void SessionPoint::advance(const AMQFrame& f) { - if (f.isLastSegment() && f.isLastFrame()) { - ++command; - offset = 0; + if (isControl(f)) return; // Ignore controls. + if (f.isFirstSegment() && f.isFirstFrame()) { + if (offset != 0) + throw FramingErrorException(QPID_MSG("Unexpected command start frame.")); + if (!isCommand(f)) + throw FramingErrorException( + QPID_MSG("Command start frame has invalid type" << f.getBody()->type())); + if (f.isLastSegment() && f.isLastFrame()) + ++command; // Single-frame command. + else + offset += f.size(); } - else { - // TODO aconway 2008-04-24: if we go to support for partial - // command replay, then it may be better to record the unframed - // data size in a command point rather than the framed size so - // that the relationship of fragment offsets to the replay - // list can be computed more easily. - // - offset += f.size(); + else { // continuation frame for partial command + if (offset == 0) + throw FramingErrorException(QPID_MSG("Unexpected command continuation frame.")); + if (f.isLastSegment() && f.isLastFrame()) { + ++command; + offset = 0; + } + else { + // TODO aconway 2008-04-24: if we go to support for partial + // command replay, then it may be better to record the unframed + // data size in a command point rather than the framed size so + // that the relationship of fragment offsets to the replay + // list can be computed more easily. + // + offset += f.size(); + } } } @@ -65,6 +89,7 @@ bool SessionPoint::operator==(const SessionPoint& x) const { return command == x.command && offset == x.offset; } + SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; } SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; } SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; } @@ -156,8 +181,7 @@ bool SessionState::receiverRecord(const AMQFrame& f) { } void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { - if (!receiver.incomplete.contains(command)) - throw InternalErrorException(QPID_MSG(getId() << "command is not received-incomplete: " << command )); + assert(receiver.incomplete.contains(command)); // Internal error to complete command twice. SequenceNumber first =cumulative ? receiver.incomplete.front() : command; SequenceNumber last = command; receiver.unknownCompleted.add(first, last); diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index f72b41ba55..40462e56e7 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -38,7 +38,7 @@ using framing::SequenceSet; /** A point in the session. Points to command id + offset */ struct SessionPoint : boost::totally_ordered1<SessionPoint> { - SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : command(command_), offset(offset_) {} + SessionPoint(SequenceNumber command = 0, uint64_t offset = 0); SequenceNumber command; uint64_t offset; @@ -178,6 +178,8 @@ class SessionState { SessionPoint received; // Received to here. Invariant: expected <= received. SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. SequenceSet incomplete; // Incomplete received commands. + int segmentType; + } receiver; SessionId id; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index cb5ae01793..dd8267a7d8 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -121,10 +121,8 @@ void SessionState::activateOutput() { Mutex::ScopedLock l(lock); if (isAttached()) getConnection().outputTasks.activateOutput(); - } - //This class could be used as the callback for queue notifications - //if not attached, it can simply ignore the callback, else pass it - //on to the connection + // FIXME aconway 2008-05-22: should we hold the lock over activateOutput?? +} ManagementObject::shared_ptr SessionState::GetManagementObject (void) const { diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index c11f155afb..4089ad79ce 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -68,7 +68,8 @@ void Connection::open(const ConnectionSettings& settings) if (impl) throw Exception(QPID_MSG("Connection::open() was already called")); - impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); + impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); + impl->open(settings.host, settings.port); max_frame_size = impl->getNegotiatedSettings().maxFrameSize; } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 67ae2293f1..bdafa795c2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -52,7 +52,6 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti connector.setTimeoutHandler(this); connector.setShutdownHandler(this); - open(settings.host, settings.port); //only set error handler once open handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); } @@ -135,11 +134,13 @@ ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedL } void ConnectionImpl::closed(uint16_t code, const std::string& text) -{ - Mutex::ScopedLock l(lock); - if (isClosed) return; - SessionVector save(closeInternal(l)); - Mutex::ScopedUnlock u(lock); +{ + SessionVector save; + { + Mutex::ScopedLock l(lock); + if (isClosed) return; + save = closeInternal(l); + } std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 986b044b49..ac28a0f695 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -33,6 +33,7 @@ #include <map> #include <boost/shared_ptr.hpp> #include <boost/weak_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> namespace qpid { namespace client { @@ -43,7 +44,8 @@ class SessionImpl; class ConnectionImpl : public Bounds, public framing::FrameHandler, public sys::TimeoutHandler, - public sys::ShutdownHandler + public sys::ShutdownHandler, + public boost::enable_shared_from_this<ConnectionImpl> { typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap; @@ -59,8 +61,6 @@ class ConnectionImpl : public Bounds, template <class F> void detachAll(const F&); - void open(const std::string& host, int port); - SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); @@ -73,6 +73,8 @@ class ConnectionImpl : public Bounds, ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings); ~ConnectionImpl(); + void open(const std::string& host, int port); + void addSession(const boost::shared_ptr<SessionImpl>&); void close(); diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index c9c55c50e8..793809fc7c 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -21,6 +21,7 @@ #include "Connector.h" #include "Bounds.h" +#include "ConnectionImpl.h" #include "ConnectionSettings.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" @@ -42,7 +43,9 @@ using namespace qpid::framing; using boost::format; using boost::str; -Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds) +Connector::Connector(ProtocolVersion ver, + const ConnectionSettings& settings, + ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), version(ver), initiated(false), @@ -52,8 +55,9 @@ Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bo idleIn(0), idleOut(0), timeoutHandler(0), shutdownHandler(0), - writer(maxFrameSize, bounds), - aio(0) + writer(maxFrameSize, cimpl), + aio(0), + impl(cimpl) { QPID_LOG(debug, "Connector created for " << version); socket.configure(settings); @@ -294,6 +298,9 @@ void Connector::eof(AsynchIO&) { // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called void Connector::run(){ + // Keep the connection impl in memory until run() completes. + boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); + assert(protect); try { Dispatcher d(poller); diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 9b13e1d519..ce2b23a32e 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -37,6 +37,7 @@ #include "qpid/sys/AsynchIO.h" #include <queue> +#include <boost/weak_ptr.hpp> #include <boost/shared_ptr.hpp> namespace qpid { @@ -45,6 +46,7 @@ namespace client { class Bounds; class ConnectionSettings; +class ConnectionImpl; class Connector : public framing::OutputHandler, private sys::Runnable @@ -121,13 +123,15 @@ class Connector : public framing::OutputHandler, void eof(qpid::sys::AsynchIO&); std::string identifier; + + ConnectionImpl* impl; friend class Channel; public: Connector(framing::ProtocolVersion pVersion, const ConnectionSettings&, - Bounds* bounds = 0); + ConnectionImpl*); virtual ~Connector(); virtual void connect(const std::string& host, int port); virtual void init(); diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index aeff35dbf0..801e33d412 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -157,7 +157,7 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) fix.session.messageTransfer(content=TransferContent(lexical_cast<string>(i), "my-queue")); } t.join(); - BOOST_CHECK_EQUAL(count, listener.messages.size()); + BOOST_REQUIRE_EQUAL(count, listener.messages.size()); for (size_t i = 0; i < count; ++i) BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData()); } diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp index 4beef87cfe..40922b3be8 100644 --- a/cpp/src/tests/SessionState.cpp +++ b/cpp/src/tests/SessionState.cpp @@ -65,17 +65,18 @@ string str(const boost::iterator_range<vector<AMQFrame>::const_iterator>& frames // Make a transfer command frame. AMQFrame transferFrame(bool hasContent) { AMQFrame t(in_place<MessageTransferBody>()); - t.setFirstFrame(); - t.setLastFrame(); - t.setFirstSegment(); + t.setFirstFrame(true); + t.setLastFrame(true); + t.setFirstSegment(true); t.setLastSegment(!hasContent); return t; } // Make a content frame AMQFrame contentFrame(string content, bool isLast=true) { AMQFrame f(in_place<AMQContentBody>(content)); - f.setFirstFrame(); - f.setLastFrame(); + f.setFirstFrame(true); + f.setLastFrame(true); + f.setFirstSegment(false); f.setLastSegment(isLast); return f; } |