summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-23 13:39:07 +0000
committerAlan Conway <aconway@apache.org>2008-05-23 13:39:07 +0000
commit896d94f7c27e958806b96b537a7a96208ede145a (patch)
tree35c139fcc037fde8fef675d9d730882d22781931 /cpp
parentbb4584d228b837fa70839560d72bc2a59dc1aa17 (diff)
downloadqpid-python-896d94f7c27e958806b96b537a7a96208ede145a.tar.gz
qpid::SessionState: Added error checking for invalid frame sequences.
client: Fix client crash on error during connection shutdown. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/rubygen/framing.0-10/constants.rb1
-rw-r--r--cpp/src/qpid/SessionState.cpp66
-rw-r--r--cpp/src/qpid/SessionState.h4
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp6
-rw-r--r--cpp/src/qpid/client/Connection.cpp3
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp13
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h8
-rw-r--r--cpp/src/qpid/client/Connector.cpp13
-rw-r--r--cpp/src/qpid/client/Connector.h6
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp2
-rw-r--r--cpp/src/tests/SessionState.cpp11
11 files changed, 87 insertions, 46 deletions
diff --git a/cpp/rubygen/framing.0-10/constants.rb b/cpp/rubygen/framing.0-10/constants.rb
index 752f50b6e9..df74ac7459 100755
--- a/cpp/rubygen/framing.0-10/constants.rb
+++ b/cpp/rubygen/framing.0-10/constants.rb
@@ -23,6 +23,7 @@ class ConstantsGen < CppGen
}
genl l.join(",\n")
}
+ define_constants_for(@amqp.domain("segment-type").enum)
namespace("execution") {
define_constants_for(@amqp.class_("execution").domain("error-code").enum)
}
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 96f2cca726..28e433b911 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -20,7 +20,7 @@
*/
#include "SessionState.h"
-#include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -28,32 +28,56 @@
namespace qpid {
using framing::AMQFrame;
-using amqp_0_10::NotImplementedException;
-using amqp_0_10::InvalidArgumentException;
-using amqp_0_10::IllegalStateException;
-using amqp_0_10::ResourceLimitExceededException;
-using amqp_0_10::InternalErrorException;
+using framing::NotImplementedException;
+using framing::InvalidArgumentException;
+using framing::IllegalStateException;
+using framing::ResourceLimitExceededException;
+using framing::InternalErrorException;
+using framing::FramingErrorException;
namespace {
bool isControl(const AMQFrame& f) {
- return f.getMethod() && f.getMethod()->type() == 0;
+ return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
}
} // namespace
-/** A point in the session - command id + offset */
+SessionPoint::SessionPoint(SequenceNumber c, uint64_t o) : command(c), offset(o) {}
+
+// TODO aconway 2008-05-22: Do complete frame sequence validity check here,
+// currently duplicated betwen broker and client session impl.
+//
void SessionPoint::advance(const AMQFrame& f) {
- if (f.isLastSegment() && f.isLastFrame()) {
- ++command;
- offset = 0;
+ if (isControl(f)) return; // Ignore controls.
+ if (f.isFirstSegment() && f.isFirstFrame()) {
+ if (offset != 0)
+ throw FramingErrorException(QPID_MSG("Unexpected command start frame."));
+ if (!isCommand(f))
+ throw FramingErrorException(
+ QPID_MSG("Command start frame has invalid type" << f.getBody()->type()));
+ if (f.isLastSegment() && f.isLastFrame())
+ ++command; // Single-frame command.
+ else
+ offset += f.size();
}
- else {
- // TODO aconway 2008-04-24: if we go to support for partial
- // command replay, then it may be better to record the unframed
- // data size in a command point rather than the framed size so
- // that the relationship of fragment offsets to the replay
- // list can be computed more easily.
- //
- offset += f.size();
+ else { // continuation frame for partial command
+ if (offset == 0)
+ throw FramingErrorException(QPID_MSG("Unexpected command continuation frame."));
+ if (f.isLastSegment() && f.isLastFrame()) {
+ ++command;
+ offset = 0;
+ }
+ else {
+ // TODO aconway 2008-04-24: if we go to support for partial
+ // command replay, then it may be better to record the unframed
+ // data size in a command point rather than the framed size so
+ // that the relationship of fragment offsets to the replay
+ // list can be computed more easily.
+ //
+ offset += f.size();
+ }
}
}
@@ -65,6 +89,7 @@ bool SessionPoint::operator==(const SessionPoint& x) const {
return command == x.command && offset == x.offset;
}
+
SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; }
SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; }
@@ -156,8 +181,7 @@ bool SessionState::receiverRecord(const AMQFrame& f) {
}
void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
- if (!receiver.incomplete.contains(command))
- throw InternalErrorException(QPID_MSG(getId() << "command is not received-incomplete: " << command ));
+ assert(receiver.incomplete.contains(command)); // Internal error to complete command twice.
SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
SequenceNumber last = command;
receiver.unknownCompleted.add(first, last);
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index f72b41ba55..40462e56e7 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -38,7 +38,7 @@ using framing::SequenceSet;
/** A point in the session. Points to command id + offset */
struct SessionPoint : boost::totally_ordered1<SessionPoint> {
- SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : command(command_), offset(offset_) {}
+ SessionPoint(SequenceNumber command = 0, uint64_t offset = 0);
SequenceNumber command;
uint64_t offset;
@@ -178,6 +178,8 @@ class SessionState {
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
SequenceSet incomplete; // Incomplete received commands.
+ int segmentType;
+
} receiver;
SessionId id;
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index cb5ae01793..dd8267a7d8 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -121,10 +121,8 @@ void SessionState::activateOutput() {
Mutex::ScopedLock l(lock);
if (isAttached())
getConnection().outputTasks.activateOutput();
- }
- //This class could be used as the callback for queue notifications
- //if not attached, it can simply ignore the callback, else pass it
- //on to the connection
+ // FIXME aconway 2008-05-22: should we hold the lock over activateOutput??
+}
ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
{
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index c11f155afb..4089ad79ce 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -68,7 +68,8 @@ void Connection::open(const ConnectionSettings& settings)
if (impl)
throw Exception(QPID_MSG("Connection::open() was already called"));
- impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
+ impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
+ impl->open(settings.host, settings.port);
max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 67ae2293f1..bdafa795c2 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -52,7 +52,6 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
connector.setTimeoutHandler(this);
connector.setShutdownHandler(this);
- open(settings.host, settings.port);
//only set error handler once open
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
}
@@ -135,11 +134,13 @@ ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedL
}
void ConnectionImpl::closed(uint16_t code, const std::string& text)
-{
- Mutex::ScopedLock l(lock);
- if (isClosed) return;
- SessionVector save(closeInternal(l));
- Mutex::ScopedUnlock u(lock);
+{
+ SessionVector save;
+ {
+ Mutex::ScopedLock l(lock);
+ if (isClosed) return;
+ save = closeInternal(l);
+ }
std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 986b044b49..ac28a0f695 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -33,6 +33,7 @@
#include <map>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
namespace qpid {
namespace client {
@@ -43,7 +44,8 @@ class SessionImpl;
class ConnectionImpl : public Bounds,
public framing::FrameHandler,
public sys::TimeoutHandler,
- public sys::ShutdownHandler
+ public sys::ShutdownHandler,
+ public boost::enable_shared_from_this<ConnectionImpl>
{
typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap;
@@ -59,8 +61,6 @@ class ConnectionImpl : public Bounds,
template <class F> void detachAll(const F&);
- void open(const std::string& host, int port);
-
SessionVector closeInternal(const sys::Mutex::ScopedLock&);
void incoming(framing::AMQFrame& frame);
void closed(uint16_t, const std::string&);
@@ -73,6 +73,8 @@ class ConnectionImpl : public Bounds,
ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
~ConnectionImpl();
+ void open(const std::string& host, int port);
+
void addSession(const boost::shared_ptr<SessionImpl>&);
void close();
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index c9c55c50e8..793809fc7c 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -21,6 +21,7 @@
#include "Connector.h"
#include "Bounds.h"
+#include "ConnectionImpl.h"
#include "ConnectionSettings.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
@@ -42,7 +43,9 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds)
+Connector::Connector(ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
version(ver),
initiated(false),
@@ -52,8 +55,9 @@ Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bo
idleIn(0), idleOut(0),
timeoutHandler(0),
shutdownHandler(0),
- writer(maxFrameSize, bounds),
- aio(0)
+ writer(maxFrameSize, cimpl),
+ aio(0),
+ impl(cimpl)
{
QPID_LOG(debug, "Connector created for " << version);
socket.configure(settings);
@@ -294,6 +298,9 @@ void Connector::eof(AsynchIO&) {
// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
// will never be called
void Connector::run(){
+ // Keep the connection impl in memory until run() completes.
+ boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+ assert(protect);
try {
Dispatcher d(poller);
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index 9b13e1d519..ce2b23a32e 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -37,6 +37,7 @@
#include "qpid/sys/AsynchIO.h"
#include <queue>
+#include <boost/weak_ptr.hpp>
#include <boost/shared_ptr.hpp>
namespace qpid {
@@ -45,6 +46,7 @@ namespace client {
class Bounds;
class ConnectionSettings;
+class ConnectionImpl;
class Connector : public framing::OutputHandler,
private sys::Runnable
@@ -121,13 +123,15 @@ class Connector : public framing::OutputHandler,
void eof(qpid::sys::AsynchIO&);
std::string identifier;
+
+ ConnectionImpl* impl;
friend class Channel;
public:
Connector(framing::ProtocolVersion pVersion,
const ConnectionSettings&,
- Bounds* bounds = 0);
+ ConnectionImpl*);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
virtual void init();
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index aeff35dbf0..801e33d412 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -157,7 +157,7 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
fix.session.messageTransfer(content=TransferContent(lexical_cast<string>(i), "my-queue"));
}
t.join();
- BOOST_CHECK_EQUAL(count, listener.messages.size());
+ BOOST_REQUIRE_EQUAL(count, listener.messages.size());
for (size_t i = 0; i < count; ++i)
BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData());
}
diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp
index 4beef87cfe..40922b3be8 100644
--- a/cpp/src/tests/SessionState.cpp
+++ b/cpp/src/tests/SessionState.cpp
@@ -65,17 +65,18 @@ string str(const boost::iterator_range<vector<AMQFrame>::const_iterator>& frames
// Make a transfer command frame.
AMQFrame transferFrame(bool hasContent) {
AMQFrame t(in_place<MessageTransferBody>());
- t.setFirstFrame();
- t.setLastFrame();
- t.setFirstSegment();
+ t.setFirstFrame(true);
+ t.setLastFrame(true);
+ t.setFirstSegment(true);
t.setLastSegment(!hasContent);
return t;
}
// Make a content frame
AMQFrame contentFrame(string content, bool isLast=true) {
AMQFrame f(in_place<AMQContentBody>(content));
- f.setFirstFrame();
- f.setLastFrame();
+ f.setFirstFrame(true);
+ f.setLastFrame(true);
+ f.setFirstSegment(false);
f.setLastSegment(isLast);
return f;
}