diff options
| author | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
| commit | f61e1ef7589da893b9b54448224dc0961515eb40 (patch) | |
| tree | 258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/client | |
| parent | c5294d471ade7a18c52ca7d4028a494011c82293 (diff) | |
| download | qpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz | |
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network
failure are automatically re-transmitted for transparent re-connection.
client::Session improvements:
- Locking to avoid races between network & user threads.
- Replaced client::StateManager with sys::StateMonitor - avoid heap allocation.
qpid::Exception clean up:
- use QPID_MSG consistently to format exception messages.
- throw typed exceptions (in reply_exceptions.h) for AMQP exceptions.
- re-throw correct typed exception on client for exceptions from broker.
- Removed QpidError.h
rubygen/templates/constants.rb:
- constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants.
- reply_constants.h: Added throwReplyException(code, text)
log::Logger:
- Fixed shutdown race in Statement::~Initializer()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 26 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 58 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Future.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FutureResponse.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FutureResult.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 366 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.h | 124 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionHandler.cpp | 132 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionHandler.h | 63 | ||||
| -rw-r--r-- | cpp/src/qpid/client/StateManager.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/StateManager.h | 4 |
18 files changed, 452 insertions, 364 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index cef34630db..16e0428a56 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -24,7 +24,6 @@ #include "Channel.h" #include "qpid/sys/Monitor.h" #include "Message.h" -#include "qpid/QpidError.h" #include "Connection.h" #include "Demux.h" #include "FutureResponse.h" @@ -71,7 +70,7 @@ void Channel::open(const Session& s) { Mutex::ScopedLock l(stopLock); if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + throw ChannelBusyException(); active = true; session = s; if(isTransactional()) { @@ -142,7 +141,7 @@ void Channel::consume( Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if (i != consumers.end()) - throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag )); Consumer& c = consumers[tag]; c.listener = listener; c.ackMode = ackMode; diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 0a6a88ae90..932fab8881 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -29,7 +29,7 @@ #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" +#include "qpid/shared_ptr.h" #include <iostream> #include <sstream> #include <functional> @@ -44,23 +44,26 @@ namespace client { Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), - impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))), - isOpen(false) {} + isOpen(false), + impl(new ConnectionImpl( + shared_ptr<Connector>(new Connector(_version, _debug)))) +{} -Connection::Connection(boost::shared_ptr<Connector> c) : +Connection::Connection(shared_ptr<Connector> c) : channelIdCounter(0), version(framing::highestProtocolVersion), max_frame_size(65536), - impl(new ConnectionImpl(c)), - isOpen(false) {} + isOpen(false), + impl(new ConnectionImpl(c)) +{} -Connection::~Connection(){} +Connection::~Connection(){ } void Connection::open( const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& vhost) { if (isOpen) - THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + throw Exception(QPID_MSG("Channel object is already open")); impl->open(host, port, uid, pwd, vhost); isOpen = true; @@ -79,10 +82,9 @@ Session Connection::newSession(uint32_t detachedLifetime) { } void Connection::resume(Session& session) { - shared_ptr<SessionCore> core=session.impl; - core->setChannel(++channelIdCounter); - impl->addSession(core); - core->resume(impl); + session.impl->setChannel(++channelIdCounter); + impl->addSession(session.impl); + session.impl->resume(impl); } void Connection::close() { diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 2e5059f135..d2612ca754 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -23,7 +23,6 @@ */ #include <map> #include <string> -#include "qpid/QpidError.h" #include "Channel.h" #include "ConnectionImpl.h" #include "qpid/client/Session.h" @@ -57,10 +56,12 @@ class Connection framing::ChannelId channelIdCounter; framing::ProtocolVersion version; const uint32_t max_frame_size; - shared_ptr<ConnectionImpl> impl; bool isOpen; bool debug; - + + protected: + boost::shared_ptr<ConnectionImpl> impl; + public: /** * Creates a connection object, but does not open the diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 4058bfb33f..a8f10c32a9 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -68,7 +68,7 @@ void ConnectionHandler::incoming(AMQFrame& frame) try { in(frame); }catch(ConnectionException& e){ - error(e.code, e.toString(), body); + error(e.code, e.what(), body); }catch(std::exception& e){ error(541/*internal error*/, e.what(), body); } @@ -124,6 +124,8 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_ void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) { + if (onError) + onError(code, message); AMQMethodBody* method = body->getMethod(); if (method) error(code, message, method->amqpClassId(), method->amqpMethodId()); diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index fae93e8294..f9273bc165 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" #include "ConnectionImpl.h" @@ -35,8 +36,9 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) { handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); handler.out = boost::bind(&Connector::send, connector, _1); - handler.onClose = boost::bind(&ConnectionImpl::closed, this); - handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2); + handler.onClose = boost::bind(&ConnectionImpl::closed, this, + REPLY_SUCCESS, std::string()); + handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); connector->setInputHandler(&handler); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); @@ -64,7 +66,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s = sessions[frame.getChannel()].lock(); } if (!s) - throw ChannelErrorException(); + throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel())); s->in(frame); } @@ -84,19 +86,8 @@ void ConnectionImpl::open(const std::string& host, int port, void ConnectionImpl::close() { - assertNotClosed(); - handler.close(); -} - -void ConnectionImpl::closed() -{ - closedByPeer(200, "OK"); -} - -void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text) -{ - signalClose(code, text); - connector->close(); + if (!isClosed) + handler.close(); } void ConnectionImpl::idleIn() @@ -110,26 +101,39 @@ void ConnectionImpl::idleOut() connector->send(frame); } +template <class F> +void ConnectionImpl::forChannels(F functor) { + for (SessionMap::iterator i = sessions.begin(); + i != sessions.end(); ++i) { + try { + boost::shared_ptr<SessionCore> s = i->second.lock(); + if (s) functor(*s); + } catch (...) { assert(0); } + } +} + void ConnectionImpl::shutdown() { - //this indicates that the socket to the server has closed - signalClose(0, "Unexpected socket closure."); + Mutex::ScopedLock l(lock); + if (isClosed) return; + forChannels(boost::bind(&SessionCore::connectionBroke, _1, + INTERNAL_ERROR, "Unexpected socket closure.")); + sessions.clear(); + isClosed = true; } -void ConnectionImpl::signalClose(uint16_t code, const std::string& text) +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - boost::shared_ptr<SessionCore> s = i->second.lock(); - if (s) - s->closed(code, text); - } + if (isClosed) return; + forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); sessions.clear(); isClosed = true; + connector->close(); } -void ConnectionImpl::assertNotClosed() -{ +void ConnectionImpl::erase(uint16_t ch) { Mutex::ScopedLock l(lock); - if (isClosed) throw Exception("Connection has been closed"); + sessions.erase(ch); } + diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index f20534f1aa..46bd5b685d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -51,14 +51,14 @@ class ConnectionImpl : public framing::FrameHandler, bool isClosed; void incoming(framing::AMQFrame& frame); - void closed(); - void closedByPeer(uint16_t, const std::string&); + void closed(uint16_t, const std::string&); void idleOut(); void idleIn(); void shutdown(); - void signalClose(uint16_t, const std::string&); - void assertNotClosed(); -public: + + template <class F> void forChannels(F functor); + + public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; ConnectionImpl(boost::shared_ptr<Connector> c); @@ -69,7 +69,9 @@ public: const std::string& pwd = "guest", const std::string& virtualhost = "/"); void close(); - void handle(framing::AMQFrame& frame); + void handle(framing::AMQFrame& frame); + void erase(uint16_t channel); + boost::shared_ptr<Connector> getConnector() { return connector; } }; }} diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index b1ec580605..ba11ea5569 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -20,7 +20,6 @@ */ #include <iostream> #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "Connector.h" @@ -36,7 +35,6 @@ namespace client { using namespace qpid::sys; using namespace qpid::framing; -using qpid::QpidError; Connector::Connector( ProtocolVersion ver, bool _debug, uint32_t buffer_size diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 8aaaea247a..af6badd6e0 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -98,6 +98,7 @@ class Connector : public framing::OutputHandler, virtual void setInputHandler(framing::InputHandler* handler); virtual void setTimeoutHandler(sys::TimeoutHandler* handler); virtual void setShutdownHandler(sys::ShutdownHandler* handler); + virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; } virtual framing::OutputHandler* getOutputHandler(); virtual void send(framing::AMQFrame& frame); virtual void setReadTimeout(uint16_t timeout); diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index e4edece414..c70b0fc455 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -73,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame) void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { if (range.size() % 2) { //must be even number - throw ConnectionException(530, "Received odd number of elements in ranged mark"); + throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark")); } else { SequenceNumber mark(cumulative); { diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h index 667a19e942..d07f9f149c 100644 --- a/cpp/src/qpid/client/Future.h +++ b/cpp/src/qpid/client/Future.h @@ -63,7 +63,7 @@ public: boost::bind(&FutureCompletion::completed, &callback) ); callback.waitForCompletion(); - session.checkClosed(); + session.assertOpen(); complete = true; } } diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index 73b7c3a7a6..5d36a1d873 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -31,7 +31,7 @@ using namespace qpid::sys; AMQMethodBody* FutureResponse::getResponse(SessionCore& session) { waitForCompletion(); - session.checkClosed(); + session.assertOpen(); return response.get(); } diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp index a523129206..681202edea 100644 --- a/cpp/src/qpid/client/FutureResult.cpp +++ b/cpp/src/qpid/client/FutureResult.cpp @@ -30,7 +30,7 @@ using namespace qpid::sys; const std::string& FutureResult::getResult(SessionCore& session) const { waitForCompletion(); - session.checkClosed(); + session.assertOpen(); return result; } diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 966d07eaef..27440465fe 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -24,105 +24,301 @@ #include "FutureResponse.h" #include "FutureResult.h" #include "ConnectionImpl.h" - +#include "qpid/framing/FrameSet.h" #include "qpid/framing/constants.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> -using namespace qpid::client; +namespace qpid { +namespace client { + using namespace qpid::framing; -SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t maxFrameSize) - : connection(conn), channel(ch), l2(*this), l3(maxFrameSize), - uuid(false), sync(false) +namespace { const std::string OK="ok"; } + +typedef sys::Monitor::ScopedLock Lock; +typedef sys::Monitor::ScopedUnlock UnLock; + +inline void SessionCore::invariant() const { + switch (state.get()) { + case OPENING: + assert(!session); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case RESUMING: + assert(session); + assert(session->getState() == SessionState::RESUMING); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case OPEN: + case CLOSING: + case SUSPENDING: + assert(session); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case SUSPENDED: + assert(code==REPLY_SUCCESS); + assert(session); + assert(!connection); + break; + case CLOSED: + assert(!session); + assert(!connection); + break; + } +} + +inline void SessionCore::setState(State s) { + state = s; + invariant(); +} + +inline void SessionCore::waitFor(State s) { + invariant(); + // We can be CLOSED or SUSPENDED by error at any time. + state.waitFor(States(s, CLOSED, SUSPENDED)); + check(); + assert(state==s); + invariant(); +} + +SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, + uint16_t ch, uint64_t maxFrameSize) + : l3(maxFrameSize), + sync(false), + channel(ch), + proxy(channel), + state(OPENING) { - l2.next = &l3; l3.out = &out; - out.next = connection.get(); + attaching(conn); } -SessionCore::~SessionCore() {} +void SessionCore::attaching(shared_ptr<ConnectionImpl> c) { + assert(c); + assert(channel.get()); + connection = c; + channel.next = connection.get(); + code = REPLY_SUCCESS; + text = OK; + state = session ? RESUMING : OPENING; + invariant(); +} -ExecutionHandler& SessionCore::getExecution() -{ - checkClosed(); - return l3; +SessionCore::~SessionCore() { + Lock l(state); + invariant(); + detach(COMMAND_INVALID, "Session deleted"); + state.waitAll(); } -FrameSet::shared_ptr SessionCore::get() -{ - checkClosed(); - return l3.getDemux().getDefault().pop(); +void SessionCore::detach(int c, const std::string& t) { + connection.reset(); + channel.next = 0; + code=c; + text=t; } -void SessionCore::setSync(bool s) -{ +void SessionCore::doClose(int code, const std::string& text) { + if (state != CLOSED) { + session.reset(); + l3.getDemux().close(); + l3.getCompletionTracker().close(); + detach(code, text); + setState(CLOSED); + } + invariant(); +} + +void SessionCore::doSuspend(int code, const std::string& text) { + if (state != CLOSED) { + invariant(); + detach(code, text); + session->suspend(); + setState(SUSPENDED); + } +} + +ExecutionHandler& SessionCore::getExecution() { // user thread + return l3; +} + +void SessionCore::setSync(bool s) { // user thread sync = s; } -bool SessionCore::isSync() -{ +bool SessionCore::isSync() { // user thread return sync; } -namespace { -struct ClosedOnExit { - SessionCore& core; - int code; - std::string text; - ClosedOnExit(SessionCore& s, int c, const std::string& t) - : core(s), code(c), text(t) {} - ~ClosedOnExit() { core.closed(code, text); } -}; +FrameSet::shared_ptr SessionCore::get() { // user thread + // No lock here: pop does a blocking wait. + return l3.getDemux().getDefault().pop(); +} + +void SessionCore::open(uint32_t detachedLifetime) { // user thread + Lock l(state); + check(state==OPENING && !session, + COMMAND_INVALID, QPID_MSG("Cannot re-open a session.")); + proxy.open(detachedLifetime); + waitFor(OPEN); +} + +void SessionCore::close() { // user thread + Lock l(state); + check(); + if (state==OPEN) { + setState(CLOSING); + proxy.close(); + waitFor(CLOSED); + } + else + doClose(REPLY_SUCCESS, OK); +} + +void SessionCore::suspend() { // user thread + Lock l(state); + checkOpen(); + setState(SUSPENDING); + proxy.suspend(); + waitFor(SUSPENDED); } -void SessionCore::close() +void SessionCore::setChannel(uint16_t ch) { channel=ch; } + +void SessionCore::resume(shared_ptr<ConnectionImpl> c) { + // user thread + { + Lock l(state); + if (state==OPEN) + doSuspend(REPLY_SUCCESS, OK); + check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed.")); + SequenceNumber sendAck=session->resuming(); + attaching(c); + proxy.resume(getId()); + waitFor(OPEN); + proxy.ack(sendAck, SequenceNumberSet()); + // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem + // for large replay sets. + SessionState::Replay replay=session->replay(); + for (SessionState::Replay::iterator i = replay.begin(); + i != replay.end(); ++i) + { + invariant(); + channel.handle(*i); // Direct to channel. + check(); + } + } +} + +void SessionCore::assertOpen() const { + Lock l(state); + checkOpen(); +} + +// network thread +void SessionCore::attached(const Uuid& sessionId, + uint32_t /*detachedLifetime*/) { - checkClosed(); - ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user."); - l2.close(); + Lock l(state); + invariant(); + check(state == OPENING || state == RESUMING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.attached")); + if (state==OPENING) { // New session + // FIXME aconway 2007-10-17: arbitrary ack value of 100 for + // client, allow configuration. + session=in_place<SessionState>(100, sessionId); + setState(OPEN); + } + else { // RESUMING + check(sessionId == session->getId(), + INVALID_ARGUMENT, QPID_MSG("session.resumed has invalid ID.")); + // Don't setState yet, wait for first incoming ack. + } } -void SessionCore::suspend() { - checkClosed(); - ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended"); - l2.suspend(); +void SessionCore::detached() { // network thread + Lock l(state); + check(state == SUSPENDING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.detached.")); + connection->erase(channel); + doSuspend(REPLY_SUCCESS, OK); +} + +void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) { + Lock l(state); + invariant(); + check(state==OPEN || state==RESUMING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.ack")); + session->receivedAck(ack); + if (state==RESUMING) { + setState(OPEN); + } + invariant(); } void SessionCore::closed(uint16_t code, const std::string& text) -{ - out.next = 0; - reason.code = code; - reason.text = text; - l2.closed(); - l3.getDemux().close(); - l3.getCompletionTracker().close(); +{ // network thread + Lock l(state); + invariant(); + doClose(code, text); } -void SessionCore::checkClosed() const -{ - // TODO: could have been a connection exception - if(out.next == 0) - throw ChannelException(reason.code, reason.text); +// closed by connection +void SessionCore::connectionClosed(uint16_t code, const std::string& text) { + Lock l(state); + try { + doClose(code, text); + } catch(...) { assert (0); } } -void SessionCore::open(uint32_t detachedLifetime) { - assert(out.next); - l2.open(detachedLifetime); +void SessionCore::connectionBroke(uint16_t code, const std::string& text) { + Lock l(state); + try { + doSuspend(code, text); + } catch (...) { assert(0); } } -void SessionCore::resume(shared_ptr<ConnectionImpl> conn) { - connection = conn; - out.next = connection.get(); - l2.resume(); +void SessionCore::check() const { // Called with lock held. + invariant(); + if (code != REPLY_SUCCESS) + throwReplyException(code, text); +} + +void SessionCore::check(bool cond, int newCode, const std::string& msg) const { + check(); + if (!cond) { + const_cast<SessionCore*>(this)->doClose(newCode, msg); + throwReplyException(code, text); + } } -Future SessionCore::send(const AMQBody& command) -{ - checkClosed(); +void SessionCore::checkOpen() const { + if (state==SUSPENDED) { + std::string cause; + if (code != REPLY_SUCCESS) + cause=" by :"+text; + throw CommandInvalidException(QPID_MSG("Session is suspended" << cause)); + } + check(state==OPEN, COMMAND_INVALID, QPID_MSG("Session is not open")); +} +Future SessionCore::send(const AMQBody& command) +{ + Lock l(state); + checkOpen(); command.getMethod()->setSync(sync); - Future f; //any result/response listeners must be set before the command is sent if (command.getMethod()->resultExpected()) { @@ -145,21 +341,61 @@ Future SessionCore::send(const AMQBody& command) Future SessionCore::send(const AMQBody& command, const MethodContent& content) { - checkClosed(); + Lock l(state); + checkOpen(); //content bearing methods don't currently have responses or //results, if that changes should follow procedure for the other //send method impl: return Future(l3.send(command, content)); } +// Network thread. void SessionCore::handleIn(AMQFrame& frame) { - l2.handle(frame); + try { + // Cast to expose private SessionHandler functions. + if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + session->received(frame); + l3.handle(frame); + } + } catch (const ChannelException& e) { + QPID_LOG(error, "Channel exception:" << e.what()); + doClose(e.code, e.what()); + } } void SessionCore::handleOut(AMQFrame& frame) { - checkClosed(); - frame.setChannel(channel); - out.next->handle(frame); + Lock l(state); + if (state==OPEN) { + if (session->sent(frame)) + proxy.solicitAck(); + channel.handle(frame); + } +} + +void SessionCore::solicitAck( ) { + Lock l(state); + checkOpen(); + proxy.ack(session->sendingAck(), SequenceNumberSet()); +} + +void SessionCore::flow(bool) { + assert(0); throw NotImplementedException("session.flow"); +} + +void SessionCore::flowOk(bool /*active*/) { + assert(0); throw NotImplementedException("session.flow"); +} + +void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) { + // FIXME aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.highWaterMark"); +} + +const Uuid SessionCore::getId() const { + if (session) + return session->getId(); + throw Exception(QPID_MSG("Closed session, no ID.")); } +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index ac109e1f5c..38c72359a3 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -22,17 +22,25 @@ #ifndef _SessionCore_ #define _SessionCore_ -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> -#include "qpid/framing/AMQMethodBody.h" +#include "qpid/shared_ptr.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/Uuid.h" -#include "SessionHandler.h" +#include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/sys/StateMonitor.h" #include "ExecutionHandler.h" +#include <boost/optional.hpp> + namespace qpid { +namespace framing { +class FrameSet; +class MethodContent; +class SequenceNumberSet; +} + namespace client { class Future; @@ -43,60 +51,90 @@ class ConnectionImpl; * Attaches to a SessionHandler when active, detaches * when closed. */ -class SessionCore : public framing::FrameHandler::InOutHandler +class SessionCore : public framing::FrameHandler::InOutHandler, + private framing::AMQP_ClientOperations::SessionHandler { - struct Reason - { - uint16_t code; - std::string text; - }; - - shared_ptr<ConnectionImpl> connection; - uint16_t channel; - SessionHandler l2; - ExecutionHandler l3; - framing::Uuid uuid; - volatile bool sync; - Reason reason; - - protected: - void handleIn(framing::AMQFrame& frame); - void handleOut(framing::AMQFrame& frame); - public: SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); ~SessionCore(); framing::FrameSet::shared_ptr get(); + const framing::Uuid getId() const; + uint16_t getChannel() const { return channel; } + void assertOpen() const; - framing::Uuid getId() const { return uuid; } - void setId(const framing::Uuid& id) { uuid= id; } - - uint16_t getChannel() const { assert(channel); return channel; } - void setChannel(uint16_t ch) { assert(ch); channel=ch; } - + // NOTE: Public functions called in user thread. void open(uint32_t detachedLifetime); - - /** Closed by client code */ void close(); - - /** Closed by peer */ - void closed(uint16_t code, const std::string& text); - void resume(shared_ptr<ConnectionImpl>); void suspend(); + void setChannel(uint16_t channel); - void setSync(bool); + void setSync(bool s); bool isSync(); ExecutionHandler& getExecution(); - void checkClosed() const; Future send(const framing::AMQBody& command); + Future send(const framing::AMQBody& command, const framing::MethodContent& content); -}; -} -} + void connectionClosed(uint16_t code, const std::string& text); + void connectionBroke(uint16_t code, const std::string& text); + + private: + enum State { + OPENING, + RESUMING, + OPEN, + CLOSING, + SUSPENDING, + SUSPENDED, + CLOSED + }; + typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; + typedef sys::StateMonitor<State, CLOSED> StateMonitor; + typedef StateMonitor::Set States; + + inline void invariant() const; + inline void setState(State s); + inline void waitFor(State); + void doClose(int code, const std::string& text); + void doSuspend(int code, const std::string& text); + + /** If there is an error, throw the exception */ + void check(bool condition, int code, const std::string& text) const; + /** Throw if *error */ + void check() const; + + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + + // Private functions are called by broker in network thread. + void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); + void flow(bool active); + void flowOk(bool active); + void detached(); + void ack(uint32_t cumulativeSeenMark, + const framing::SequenceNumberSet& seenFrameSet); + void highWaterMark(uint32_t lastSentMark); + void solicitAck(); + void closed(uint16_t code, const std::string& text); + + void attaching(shared_ptr<ConnectionImpl>); + void detach(int code, const std::string& text); + void checkOpen() const; + + int code; // Error code + std::string text; // Error text + boost::optional<framing::SessionState> session; + shared_ptr<ConnectionImpl> connection; + ExecutionHandler l3; + volatile bool sync; + framing::ChannelHandler channel; + framing::AMQP_ServerProxy::Session proxy; + mutable StateMonitor state; +}; +}} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp deleted file mode 100644 index d3b04e5356..0000000000 --- a/cpp/src/qpid/client/SessionHandler.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SessionHandler.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/client/SessionCore.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" - -using namespace qpid::client; -using namespace qpid::framing; -using namespace boost; - -namespace { -// TODO aconway 2007-09-28: hack till we have multi-version support. -ProtocolVersion version; -} - -SessionHandler::SessionHandler(SessionCore& parent) - : StateManager(CLOSED), core(parent) {} - -SessionHandler::~SessionHandler() {} - -void SessionHandler::handle(AMQFrame& frame) -{ - AMQBody* body = frame.getBody(); - if (getState() == OPEN) { - core.checkClosed(); - SessionClosedBody* closedBody= - dynamic_cast<SessionClosedBody*>(body->getMethod()); - if (closedBody) { - closed(); - core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); - } else { - try { - next->handle(frame); - } - catch(const ChannelException& e){ - QPID_LOG(error, "Channel exception:" << e.what()); - closed(); - AMQFrame f(0, SessionClosedBody(version, e.code, e.toString())); - core.out(f); - core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); - } - } - } else { - if (body->getMethod()) - handleMethod(body->getMethod()); - else - throw ConnectionException(504, "Channel not open for content."); - } -} - -void SessionHandler::attach(const AMQMethodBody& command) -{ - setState(OPENING); - AMQFrame f(0, command); - core.out(f); - std::set<int> states; - states.insert(OPEN); - states.insert(CLOSED); - waitFor(states); - if (getState() != OPEN) - throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel())); -} - -void SessionHandler::open(uint32_t detachedLifetime) { - attach(SessionOpenBody(version, detachedLifetime)); -} - -void SessionHandler::resume() { - attach(SessionResumeBody(version, core.getId())); -} - -void SessionHandler::detach(const AMQMethodBody& command) -{ - setState(CLOSING); - AMQFrame f(0, command); - core.out(f); - waitFor(CLOSED); -} - -void SessionHandler::close() { detach(SessionCloseBody(version)); } -void SessionHandler::suspend() { detach(SessionSuspendBody(version)); } -void SessionHandler::closed() { setState(CLOSED); } - -void SessionHandler::handleMethod(AMQMethodBody* method) -{ - switch (getState()) { - case OPENING: { - SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method); - if (attached) { - core.setId(attached->getSessionId()); - setState(OPEN); - } else - throw ChannelErrorException(); - break; - } - case CLOSING: - if (method->isA<SessionClosedBody>() || - method->isA<SessionDetachedBody>()) - closed(); - break; - - case CLOSED: - throw ChannelErrorException(); - - default: - assert(0); - throw InternalErrorException(QPID_MSG("Internal Error.")); - } -} - diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h deleted file mode 100644 index 994b8402de..0000000000 --- a/cpp/src/qpid/client/SessionHandler.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SessionHandler_ -#define _SessionHandler_ - -#include "StateManager.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/Uuid.h" -#include "qpid/shared_ptr.h" - -namespace qpid { -namespace client { -class SessionCore; - -/** - * Handles incoming session (L2) commands. - */ -class SessionHandler : public framing::FrameHandler, - private StateManager -{ - enum STATES {OPENING, OPEN, CLOSING, CLOSED}; - SessionCore& core; - - void handleMethod(framing::AMQMethodBody* method); - void attach(const framing::AMQMethodBody&); - void detach(const framing::AMQMethodBody&); - - public: - SessionHandler(SessionCore& parent); - ~SessionHandler(); - - /** Incoming from broker */ - void handle(framing::AMQFrame&); - - void open(uint32_t detachedLifetime); - void resume(); - void close(); - void closed(); - void suspend(); -}; - -}} - -#endif diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp index b72967c098..0cb3c6b9d4 100644 --- a/cpp/src/qpid/client/StateManager.cpp +++ b/cpp/src/qpid/client/StateManager.cpp @@ -60,7 +60,7 @@ void StateManager::setState(int s) stateLock.notifyAll(); } -int StateManager::getState() +int StateManager::getState() const { Monitor::ScopedLock l(stateLock); return state; diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h index fd0c1b7f86..2f8ecb772c 100644 --- a/cpp/src/qpid/client/StateManager.h +++ b/cpp/src/qpid/client/StateManager.h @@ -30,12 +30,12 @@ namespace client { class StateManager { int state; - sys::Monitor stateLock; + mutable sys::Monitor stateLock; public: StateManager(int initial); void setState(int state); - int getState(); + int getState() const ; void waitForStateChange(int current); void waitFor(std::set<int> states); void waitFor(int state); |
