summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
committerAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
commitf61e1ef7589da893b9b54448224dc0961515eb40 (patch)
tree258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/client
parentc5294d471ade7a18c52ca7d4028a494011c82293 (diff)
downloadqpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network failure are automatically re-transmitted for transparent re-connection. client::Session improvements: - Locking to avoid races between network & user threads. - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation. qpid::Exception clean up: - use QPID_MSG consistently to format exception messages. - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions. - re-throw correct typed exception on client for exceptions from broker. - Removed QpidError.h rubygen/templates/constants.rb: - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants. - reply_constants.h: Added throwReplyException(code, text) log::Logger: - Fixed shutdown race in Statement::~Initializer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Channel.cpp5
-rw-r--r--cpp/src/qpid/client/Connection.cpp26
-rw-r--r--cpp/src/qpid/client/Connection.h7
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp58
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h14
-rw-r--r--cpp/src/qpid/client/Connector.cpp2
-rw-r--r--cpp/src/qpid/client/Connector.h1
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/Future.h2
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp2
-rw-r--r--cpp/src/qpid/client/FutureResult.cpp2
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp366
-rw-r--r--cpp/src/qpid/client/SessionCore.h124
-rw-r--r--cpp/src/qpid/client/SessionHandler.cpp132
-rw-r--r--cpp/src/qpid/client/SessionHandler.h63
-rw-r--r--cpp/src/qpid/client/StateManager.cpp2
-rw-r--r--cpp/src/qpid/client/StateManager.h4
18 files changed, 452 insertions, 364 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index cef34630db..16e0428a56 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -24,7 +24,6 @@
#include "Channel.h"
#include "qpid/sys/Monitor.h"
#include "Message.h"
-#include "qpid/QpidError.h"
#include "Connection.h"
#include "Demux.h"
#include "FutureResponse.h"
@@ -71,7 +70,7 @@ void Channel::open(const Session& s)
{
Mutex::ScopedLock l(stopLock);
if (isOpen())
- THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
+ throw ChannelBusyException();
active = true;
session = s;
if(isTransactional()) {
@@ -142,7 +141,7 @@ void Channel::consume(
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
- throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+ throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag ));
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 0a6a88ae90..932fab8881 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -29,7 +29,7 @@
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/shared_ptr.h"
#include <iostream>
#include <sstream>
#include <functional>
@@ -44,23 +44,26 @@ namespace client {
Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) :
channelIdCounter(0), version(_version),
max_frame_size(_max_frame_size),
- impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))),
- isOpen(false) {}
+ isOpen(false),
+ impl(new ConnectionImpl(
+ shared_ptr<Connector>(new Connector(_version, _debug))))
+{}
-Connection::Connection(boost::shared_ptr<Connector> c) :
+Connection::Connection(shared_ptr<Connector> c) :
channelIdCounter(0), version(framing::highestProtocolVersion),
max_frame_size(65536),
- impl(new ConnectionImpl(c)),
- isOpen(false) {}
+ isOpen(false),
+ impl(new ConnectionImpl(c))
+{}
-Connection::~Connection(){}
+Connection::~Connection(){ }
void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd, const std::string& vhost)
{
if (isOpen)
- THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+ throw Exception(QPID_MSG("Channel object is already open"));
impl->open(host, port, uid, pwd, vhost);
isOpen = true;
@@ -79,10 +82,9 @@ Session Connection::newSession(uint32_t detachedLifetime) {
}
void Connection::resume(Session& session) {
- shared_ptr<SessionCore> core=session.impl;
- core->setChannel(++channelIdCounter);
- impl->addSession(core);
- core->resume(impl);
+ session.impl->setChannel(++channelIdCounter);
+ impl->addSession(session.impl);
+ session.impl->resume(impl);
}
void Connection::close() {
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 2e5059f135..d2612ca754 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -23,7 +23,6 @@
*/
#include <map>
#include <string>
-#include "qpid/QpidError.h"
#include "Channel.h"
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
@@ -57,10 +56,12 @@ class Connection
framing::ChannelId channelIdCounter;
framing::ProtocolVersion version;
const uint32_t max_frame_size;
- shared_ptr<ConnectionImpl> impl;
bool isOpen;
bool debug;
-
+
+ protected:
+ boost::shared_ptr<ConnectionImpl> impl;
+
public:
/**
* Creates a connection object, but does not open the
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 4058bfb33f..a8f10c32a9 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -68,7 +68,7 @@ void ConnectionHandler::incoming(AMQFrame& frame)
try {
in(frame);
}catch(ConnectionException& e){
- error(e.code, e.toString(), body);
+ error(e.code, e.what(), body);
}catch(std::exception& e){
error(541/*internal error*/, e.what(), body);
}
@@ -124,6 +124,8 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_
void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
{
+ if (onError)
+ onError(code, message);
AMQMethodBody* method = body->getMethod();
if (method)
error(code, message, method->amqpClassId(), method->amqpMethodId());
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index fae93e8294..f9273bc165 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
#include "ConnectionImpl.h"
@@ -35,8 +36,9 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
- handler.onClose = boost::bind(&ConnectionImpl::closed, this);
- handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2);
+ handler.onClose = boost::bind(&ConnectionImpl::closed, this,
+ REPLY_SUCCESS, std::string());
+ handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
@@ -64,7 +66,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s = sessions[frame.getChannel()].lock();
}
if (!s)
- throw ChannelErrorException();
+ throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel()));
s->in(frame);
}
@@ -84,19 +86,8 @@ void ConnectionImpl::open(const std::string& host, int port,
void ConnectionImpl::close()
{
- assertNotClosed();
- handler.close();
-}
-
-void ConnectionImpl::closed()
-{
- closedByPeer(200, "OK");
-}
-
-void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
-{
- signalClose(code, text);
- connector->close();
+ if (!isClosed)
+ handler.close();
}
void ConnectionImpl::idleIn()
@@ -110,26 +101,39 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
+template <class F>
+void ConnectionImpl::forChannels(F functor) {
+ for (SessionMap::iterator i = sessions.begin();
+ i != sessions.end(); ++i) {
+ try {
+ boost::shared_ptr<SessionCore> s = i->second.lock();
+ if (s) functor(*s);
+ } catch (...) { assert(0); }
+ }
+}
+
void ConnectionImpl::shutdown()
{
- //this indicates that the socket to the server has closed
- signalClose(0, "Unexpected socket closure.");
+ Mutex::ScopedLock l(lock);
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionBroke, _1,
+ INTERNAL_ERROR, "Unexpected socket closure."));
+ sessions.clear();
+ isClosed = true;
}
-void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- boost::shared_ptr<SessionCore> s = i->second.lock();
- if (s)
- s->closed(code, text);
- }
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
sessions.clear();
isClosed = true;
+ connector->close();
}
-void ConnectionImpl::assertNotClosed()
-{
+void ConnectionImpl::erase(uint16_t ch) {
Mutex::ScopedLock l(lock);
- if (isClosed) throw Exception("Connection has been closed");
+ sessions.erase(ch);
}
+
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index f20534f1aa..46bd5b685d 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -51,14 +51,14 @@ class ConnectionImpl : public framing::FrameHandler,
bool isClosed;
void incoming(framing::AMQFrame& frame);
- void closed();
- void closedByPeer(uint16_t, const std::string&);
+ void closed(uint16_t, const std::string&);
void idleOut();
void idleIn();
void shutdown();
- void signalClose(uint16_t, const std::string&);
- void assertNotClosed();
-public:
+
+ template <class F> void forChannels(F functor);
+
+ public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
@@ -69,7 +69,9 @@ public:
const std::string& pwd = "guest",
const std::string& virtualhost = "/");
void close();
- void handle(framing::AMQFrame& frame);
+ void handle(framing::AMQFrame& frame);
+ void erase(uint16_t channel);
+ boost::shared_ptr<Connector> getConnector() { return connector; }
};
}}
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index b1ec580605..ba11ea5569 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -20,7 +20,6 @@
*/
#include <iostream>
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
@@ -36,7 +35,6 @@ namespace client {
using namespace qpid::sys;
using namespace qpid::framing;
-using qpid::QpidError;
Connector::Connector(
ProtocolVersion ver, bool _debug, uint32_t buffer_size
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index 8aaaea247a..af6badd6e0 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -98,6 +98,7 @@ class Connector : public framing::OutputHandler,
virtual void setInputHandler(framing::InputHandler* handler);
virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
virtual void setShutdownHandler(sys::ShutdownHandler* handler);
+ virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
virtual framing::OutputHandler* getOutputHandler();
virtual void send(framing::AMQFrame& frame);
virtual void setReadTimeout(uint16_t timeout);
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index e4edece414..c70b0fc455 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -73,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
if (range.size() % 2) { //must be even number
- throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark"));
} else {
SequenceNumber mark(cumulative);
{
diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h
index 667a19e942..d07f9f149c 100644
--- a/cpp/src/qpid/client/Future.h
+++ b/cpp/src/qpid/client/Future.h
@@ -63,7 +63,7 @@ public:
boost::bind(&FutureCompletion::completed, &callback)
);
callback.waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
complete = true;
}
}
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index 73b7c3a7a6..5d36a1d873 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -31,7 +31,7 @@ using namespace qpid::sys;
AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
{
waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
return response.get();
}
diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp
index a523129206..681202edea 100644
--- a/cpp/src/qpid/client/FutureResult.cpp
+++ b/cpp/src/qpid/client/FutureResult.cpp
@@ -30,7 +30,7 @@ using namespace qpid::sys;
const std::string& FutureResult::getResult(SessionCore& session) const
{
waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
return result;
}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 966d07eaef..27440465fe 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -24,105 +24,301 @@
#include "FutureResponse.h"
#include "FutureResult.h"
#include "ConnectionImpl.h"
-
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-using namespace qpid::client;
+namespace qpid {
+namespace client {
+
using namespace qpid::framing;
-SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t maxFrameSize)
- : connection(conn), channel(ch), l2(*this), l3(maxFrameSize),
- uuid(false), sync(false)
+namespace { const std::string OK="ok"; }
+
+typedef sys::Monitor::ScopedLock Lock;
+typedef sys::Monitor::ScopedUnlock UnLock;
+
+inline void SessionCore::invariant() const {
+ switch (state.get()) {
+ case OPENING:
+ assert(!session);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case RESUMING:
+ assert(session);
+ assert(session->getState() == SessionState::RESUMING);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case OPEN:
+ case CLOSING:
+ case SUSPENDING:
+ assert(session);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case SUSPENDED:
+ assert(code==REPLY_SUCCESS);
+ assert(session);
+ assert(!connection);
+ break;
+ case CLOSED:
+ assert(!session);
+ assert(!connection);
+ break;
+ }
+}
+
+inline void SessionCore::setState(State s) {
+ state = s;
+ invariant();
+}
+
+inline void SessionCore::waitFor(State s) {
+ invariant();
+ // We can be CLOSED or SUSPENDED by error at any time.
+ state.waitFor(States(s, CLOSED, SUSPENDED));
+ check();
+ assert(state==s);
+ invariant();
+}
+
+SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn,
+ uint16_t ch, uint64_t maxFrameSize)
+ : l3(maxFrameSize),
+ sync(false),
+ channel(ch),
+ proxy(channel),
+ state(OPENING)
{
- l2.next = &l3;
l3.out = &out;
- out.next = connection.get();
+ attaching(conn);
}
-SessionCore::~SessionCore() {}
+void SessionCore::attaching(shared_ptr<ConnectionImpl> c) {
+ assert(c);
+ assert(channel.get());
+ connection = c;
+ channel.next = connection.get();
+ code = REPLY_SUCCESS;
+ text = OK;
+ state = session ? RESUMING : OPENING;
+ invariant();
+}
-ExecutionHandler& SessionCore::getExecution()
-{
- checkClosed();
- return l3;
+SessionCore::~SessionCore() {
+ Lock l(state);
+ invariant();
+ detach(COMMAND_INVALID, "Session deleted");
+ state.waitAll();
}
-FrameSet::shared_ptr SessionCore::get()
-{
- checkClosed();
- return l3.getDemux().getDefault().pop();
+void SessionCore::detach(int c, const std::string& t) {
+ connection.reset();
+ channel.next = 0;
+ code=c;
+ text=t;
}
-void SessionCore::setSync(bool s)
-{
+void SessionCore::doClose(int code, const std::string& text) {
+ if (state != CLOSED) {
+ session.reset();
+ l3.getDemux().close();
+ l3.getCompletionTracker().close();
+ detach(code, text);
+ setState(CLOSED);
+ }
+ invariant();
+}
+
+void SessionCore::doSuspend(int code, const std::string& text) {
+ if (state != CLOSED) {
+ invariant();
+ detach(code, text);
+ session->suspend();
+ setState(SUSPENDED);
+ }
+}
+
+ExecutionHandler& SessionCore::getExecution() { // user thread
+ return l3;
+}
+
+void SessionCore::setSync(bool s) { // user thread
sync = s;
}
-bool SessionCore::isSync()
-{
+bool SessionCore::isSync() { // user thread
return sync;
}
-namespace {
-struct ClosedOnExit {
- SessionCore& core;
- int code;
- std::string text;
- ClosedOnExit(SessionCore& s, int c, const std::string& t)
- : core(s), code(c), text(t) {}
- ~ClosedOnExit() { core.closed(code, text); }
-};
+FrameSet::shared_ptr SessionCore::get() { // user thread
+ // No lock here: pop does a blocking wait.
+ return l3.getDemux().getDefault().pop();
+}
+
+void SessionCore::open(uint32_t detachedLifetime) { // user thread
+ Lock l(state);
+ check(state==OPENING && !session,
+ COMMAND_INVALID, QPID_MSG("Cannot re-open a session."));
+ proxy.open(detachedLifetime);
+ waitFor(OPEN);
+}
+
+void SessionCore::close() { // user thread
+ Lock l(state);
+ check();
+ if (state==OPEN) {
+ setState(CLOSING);
+ proxy.close();
+ waitFor(CLOSED);
+ }
+ else
+ doClose(REPLY_SUCCESS, OK);
+}
+
+void SessionCore::suspend() { // user thread
+ Lock l(state);
+ checkOpen();
+ setState(SUSPENDING);
+ proxy.suspend();
+ waitFor(SUSPENDED);
}
-void SessionCore::close()
+void SessionCore::setChannel(uint16_t ch) { channel=ch; }
+
+void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
+ // user thread
+ {
+ Lock l(state);
+ if (state==OPEN)
+ doSuspend(REPLY_SUCCESS, OK);
+ check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed."));
+ SequenceNumber sendAck=session->resuming();
+ attaching(c);
+ proxy.resume(getId());
+ waitFor(OPEN);
+ proxy.ack(sendAck, SequenceNumberSet());
+ // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem
+ // for large replay sets.
+ SessionState::Replay replay=session->replay();
+ for (SessionState::Replay::iterator i = replay.begin();
+ i != replay.end(); ++i)
+ {
+ invariant();
+ channel.handle(*i); // Direct to channel.
+ check();
+ }
+ }
+}
+
+void SessionCore::assertOpen() const {
+ Lock l(state);
+ checkOpen();
+}
+
+// network thread
+void SessionCore::attached(const Uuid& sessionId,
+ uint32_t /*detachedLifetime*/)
{
- checkClosed();
- ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user.");
- l2.close();
+ Lock l(state);
+ invariant();
+ check(state == OPENING || state == RESUMING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.attached"));
+ if (state==OPENING) { // New session
+ // FIXME aconway 2007-10-17: arbitrary ack value of 100 for
+ // client, allow configuration.
+ session=in_place<SessionState>(100, sessionId);
+ setState(OPEN);
+ }
+ else { // RESUMING
+ check(sessionId == session->getId(),
+ INVALID_ARGUMENT, QPID_MSG("session.resumed has invalid ID."));
+ // Don't setState yet, wait for first incoming ack.
+ }
}
-void SessionCore::suspend() {
- checkClosed();
- ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended");
- l2.suspend();
+void SessionCore::detached() { // network thread
+ Lock l(state);
+ check(state == SUSPENDING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.detached."));
+ connection->erase(channel);
+ doSuspend(REPLY_SUCCESS, OK);
+}
+
+void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) {
+ Lock l(state);
+ invariant();
+ check(state==OPEN || state==RESUMING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.ack"));
+ session->receivedAck(ack);
+ if (state==RESUMING) {
+ setState(OPEN);
+ }
+ invariant();
}
void SessionCore::closed(uint16_t code, const std::string& text)
-{
- out.next = 0;
- reason.code = code;
- reason.text = text;
- l2.closed();
- l3.getDemux().close();
- l3.getCompletionTracker().close();
+{ // network thread
+ Lock l(state);
+ invariant();
+ doClose(code, text);
}
-void SessionCore::checkClosed() const
-{
- // TODO: could have been a connection exception
- if(out.next == 0)
- throw ChannelException(reason.code, reason.text);
+// closed by connection
+void SessionCore::connectionClosed(uint16_t code, const std::string& text) {
+ Lock l(state);
+ try {
+ doClose(code, text);
+ } catch(...) { assert (0); }
}
-void SessionCore::open(uint32_t detachedLifetime) {
- assert(out.next);
- l2.open(detachedLifetime);
+void SessionCore::connectionBroke(uint16_t code, const std::string& text) {
+ Lock l(state);
+ try {
+ doSuspend(code, text);
+ } catch (...) { assert(0); }
}
-void SessionCore::resume(shared_ptr<ConnectionImpl> conn) {
- connection = conn;
- out.next = connection.get();
- l2.resume();
+void SessionCore::check() const { // Called with lock held.
+ invariant();
+ if (code != REPLY_SUCCESS)
+ throwReplyException(code, text);
+}
+
+void SessionCore::check(bool cond, int newCode, const std::string& msg) const {
+ check();
+ if (!cond) {
+ const_cast<SessionCore*>(this)->doClose(newCode, msg);
+ throwReplyException(code, text);
+ }
}
-Future SessionCore::send(const AMQBody& command)
-{
- checkClosed();
+void SessionCore::checkOpen() const {
+ if (state==SUSPENDED) {
+ std::string cause;
+ if (code != REPLY_SUCCESS)
+ cause=" by :"+text;
+ throw CommandInvalidException(QPID_MSG("Session is suspended" << cause));
+ }
+ check(state==OPEN, COMMAND_INVALID, QPID_MSG("Session is not open"));
+}
+Future SessionCore::send(const AMQBody& command)
+{
+ Lock l(state);
+ checkOpen();
command.getMethod()->setSync(sync);
-
Future f;
//any result/response listeners must be set before the command is sent
if (command.getMethod()->resultExpected()) {
@@ -145,21 +341,61 @@ Future SessionCore::send(const AMQBody& command)
Future SessionCore::send(const AMQBody& command, const MethodContent& content)
{
- checkClosed();
+ Lock l(state);
+ checkOpen();
//content bearing methods don't currently have responses or
//results, if that changes should follow procedure for the other
//send method impl:
return Future(l3.send(command, content));
}
+// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
- l2.handle(frame);
+ try {
+ // Cast to expose private SessionHandler functions.
+ if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ session->received(frame);
+ l3.handle(frame);
+ }
+ } catch (const ChannelException& e) {
+ QPID_LOG(error, "Channel exception:" << e.what());
+ doClose(e.code, e.what());
+ }
}
void SessionCore::handleOut(AMQFrame& frame)
{
- checkClosed();
- frame.setChannel(channel);
- out.next->handle(frame);
+ Lock l(state);
+ if (state==OPEN) {
+ if (session->sent(frame))
+ proxy.solicitAck();
+ channel.handle(frame);
+ }
+}
+
+void SessionCore::solicitAck( ) {
+ Lock l(state);
+ checkOpen();
+ proxy.ack(session->sendingAck(), SequenceNumberSet());
+}
+
+void SessionCore::flow(bool) {
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void SessionCore::flowOk(bool /*active*/) {
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) {
+ // FIXME aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.highWaterMark");
+}
+
+const Uuid SessionCore::getId() const {
+ if (session)
+ return session->getId();
+ throw Exception(QPID_MSG("Closed session, no ID."));
}
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index ac109e1f5c..38c72359a3 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -22,17 +22,25 @@
#ifndef _SessionCore_
#define _SessionCore_
-#include <boost/function.hpp>
-#include <boost/shared_ptr.hpp>
-#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/shared_ptr.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/Uuid.h"
-#include "SessionHandler.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/sys/StateMonitor.h"
#include "ExecutionHandler.h"
+#include <boost/optional.hpp>
+
namespace qpid {
+namespace framing {
+class FrameSet;
+class MethodContent;
+class SequenceNumberSet;
+}
+
namespace client {
class Future;
@@ -43,60 +51,90 @@ class ConnectionImpl;
* Attaches to a SessionHandler when active, detaches
* when closed.
*/
-class SessionCore : public framing::FrameHandler::InOutHandler
+class SessionCore : public framing::FrameHandler::InOutHandler,
+ private framing::AMQP_ClientOperations::SessionHandler
{
- struct Reason
- {
- uint16_t code;
- std::string text;
- };
-
- shared_ptr<ConnectionImpl> connection;
- uint16_t channel;
- SessionHandler l2;
- ExecutionHandler l3;
- framing::Uuid uuid;
- volatile bool sync;
- Reason reason;
-
- protected:
- void handleIn(framing::AMQFrame& frame);
- void handleOut(framing::AMQFrame& frame);
-
public:
SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
~SessionCore();
framing::FrameSet::shared_ptr get();
+ const framing::Uuid getId() const;
+ uint16_t getChannel() const { return channel; }
+ void assertOpen() const;
- framing::Uuid getId() const { return uuid; }
- void setId(const framing::Uuid& id) { uuid= id; }
-
- uint16_t getChannel() const { assert(channel); return channel; }
- void setChannel(uint16_t ch) { assert(ch); channel=ch; }
-
+ // NOTE: Public functions called in user thread.
void open(uint32_t detachedLifetime);
-
- /** Closed by client code */
void close();
-
- /** Closed by peer */
- void closed(uint16_t code, const std::string& text);
-
void resume(shared_ptr<ConnectionImpl>);
void suspend();
+ void setChannel(uint16_t channel);
- void setSync(bool);
+ void setSync(bool s);
bool isSync();
ExecutionHandler& getExecution();
- void checkClosed() const;
Future send(const framing::AMQBody& command);
+
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-};
-}
-}
+ void connectionClosed(uint16_t code, const std::string& text);
+ void connectionBroke(uint16_t code, const std::string& text);
+
+ private:
+ enum State {
+ OPENING,
+ RESUMING,
+ OPEN,
+ CLOSING,
+ SUSPENDING,
+ SUSPENDED,
+ CLOSED
+ };
+ typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
+ typedef sys::StateMonitor<State, CLOSED> StateMonitor;
+ typedef StateMonitor::Set States;
+
+ inline void invariant() const;
+ inline void setState(State s);
+ inline void waitFor(State);
+ void doClose(int code, const std::string& text);
+ void doSuspend(int code, const std::string& text);
+
+ /** If there is an error, throw the exception */
+ void check(bool condition, int code, const std::string& text) const;
+ /** Throw if *error */
+ void check() const;
+
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+
+ // Private functions are called by broker in network thread.
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void flow(bool active);
+ void flowOk(bool active);
+ void detached();
+ void ack(uint32_t cumulativeSeenMark,
+ const framing::SequenceNumberSet& seenFrameSet);
+ void highWaterMark(uint32_t lastSentMark);
+ void solicitAck();
+ void closed(uint16_t code, const std::string& text);
+
+ void attaching(shared_ptr<ConnectionImpl>);
+ void detach(int code, const std::string& text);
+ void checkOpen() const;
+
+ int code; // Error code
+ std::string text; // Error text
+ boost::optional<framing::SessionState> session;
+ shared_ptr<ConnectionImpl> connection;
+ ExecutionHandler l3;
+ volatile bool sync;
+ framing::ChannelHandler channel;
+ framing::AMQP_ServerProxy::Session proxy;
+ mutable StateMonitor state;
+};
+}} // namespace qpid::client
#endif
diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp
deleted file mode 100644
index d3b04e5356..0000000000
--- a/cpp/src/qpid/client/SessionHandler.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SessionHandler.h"
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/all_method_bodies.h"
-#include "qpid/client/SessionCore.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace boost;
-
-namespace {
-// TODO aconway 2007-09-28: hack till we have multi-version support.
-ProtocolVersion version;
-}
-
-SessionHandler::SessionHandler(SessionCore& parent)
- : StateManager(CLOSED), core(parent) {}
-
-SessionHandler::~SessionHandler() {}
-
-void SessionHandler::handle(AMQFrame& frame)
-{
- AMQBody* body = frame.getBody();
- if (getState() == OPEN) {
- core.checkClosed();
- SessionClosedBody* closedBody=
- dynamic_cast<SessionClosedBody*>(body->getMethod());
- if (closedBody) {
- closed();
- core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
- } else {
- try {
- next->handle(frame);
- }
- catch(const ChannelException& e){
- QPID_LOG(error, "Channel exception:" << e.what());
- closed();
- AMQFrame f(0, SessionClosedBody(version, e.code, e.toString()));
- core.out(f);
- core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
- }
- }
- } else {
- if (body->getMethod())
- handleMethod(body->getMethod());
- else
- throw ConnectionException(504, "Channel not open for content.");
- }
-}
-
-void SessionHandler::attach(const AMQMethodBody& command)
-{
- setState(OPENING);
- AMQFrame f(0, command);
- core.out(f);
- std::set<int> states;
- states.insert(OPEN);
- states.insert(CLOSED);
- waitFor(states);
- if (getState() != OPEN)
- throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel()));
-}
-
-void SessionHandler::open(uint32_t detachedLifetime) {
- attach(SessionOpenBody(version, detachedLifetime));
-}
-
-void SessionHandler::resume() {
- attach(SessionResumeBody(version, core.getId()));
-}
-
-void SessionHandler::detach(const AMQMethodBody& command)
-{
- setState(CLOSING);
- AMQFrame f(0, command);
- core.out(f);
- waitFor(CLOSED);
-}
-
-void SessionHandler::close() { detach(SessionCloseBody(version)); }
-void SessionHandler::suspend() { detach(SessionSuspendBody(version)); }
-void SessionHandler::closed() { setState(CLOSED); }
-
-void SessionHandler::handleMethod(AMQMethodBody* method)
-{
- switch (getState()) {
- case OPENING: {
- SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method);
- if (attached) {
- core.setId(attached->getSessionId());
- setState(OPEN);
- } else
- throw ChannelErrorException();
- break;
- }
- case CLOSING:
- if (method->isA<SessionClosedBody>() ||
- method->isA<SessionDetachedBody>())
- closed();
- break;
-
- case CLOSED:
- throw ChannelErrorException();
-
- default:
- assert(0);
- throw InternalErrorException(QPID_MSG("Internal Error."));
- }
-}
-
diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h
deleted file mode 100644
index 994b8402de..0000000000
--- a/cpp/src/qpid/client/SessionHandler.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _SessionHandler_
-#define _SessionHandler_
-
-#include "StateManager.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/shared_ptr.h"
-
-namespace qpid {
-namespace client {
-class SessionCore;
-
-/**
- * Handles incoming session (L2) commands.
- */
-class SessionHandler : public framing::FrameHandler,
- private StateManager
-{
- enum STATES {OPENING, OPEN, CLOSING, CLOSED};
- SessionCore& core;
-
- void handleMethod(framing::AMQMethodBody* method);
- void attach(const framing::AMQMethodBody&);
- void detach(const framing::AMQMethodBody&);
-
- public:
- SessionHandler(SessionCore& parent);
- ~SessionHandler();
-
- /** Incoming from broker */
- void handle(framing::AMQFrame&);
-
- void open(uint32_t detachedLifetime);
- void resume();
- void close();
- void closed();
- void suspend();
-};
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp
index b72967c098..0cb3c6b9d4 100644
--- a/cpp/src/qpid/client/StateManager.cpp
+++ b/cpp/src/qpid/client/StateManager.cpp
@@ -60,7 +60,7 @@ void StateManager::setState(int s)
stateLock.notifyAll();
}
-int StateManager::getState()
+int StateManager::getState() const
{
Monitor::ScopedLock l(stateLock);
return state;
diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h
index fd0c1b7f86..2f8ecb772c 100644
--- a/cpp/src/qpid/client/StateManager.h
+++ b/cpp/src/qpid/client/StateManager.h
@@ -30,12 +30,12 @@ namespace client {
class StateManager
{
int state;
- sys::Monitor stateLock;
+ mutable sys::Monitor stateLock;
public:
StateManager(int initial);
void setState(int state);
- int getState();
+ int getState() const ;
void waitForStateChange(int current);
void waitFor(std::set<int> states);
void waitFor(int state);