summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Connection.cpp24
-rw-r--r--cpp/src/qpid/client/Connection.h22
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp46
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h15
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h7
-rw-r--r--cpp/src/qpid/client/ScopedAssociation.h53
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp84
-rw-r--r--cpp/src/qpid/client/SessionCore.h54
-rw-r--r--cpp/src/qpid/client/SessionHandler.cpp121
-rw-r--r--cpp/src/qpid/client/SessionHandler.h42
11 files changed, 249 insertions, 221 deletions
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index cef076527f..2d8cbb2ddb 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -25,7 +25,7 @@
#include "Connection.h"
#include "Channel.h"
#include "Message.h"
-#include "ScopedAssociation.h"
+#include "SessionCore.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -70,16 +70,22 @@ void Connection::openChannel(Channel& channel) {
channel.open(newSession());
}
-Session Connection::newSession() {
- ChannelId id = ++channelIdCounter;
- SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl));
- session->open();
- return Session(assoc);
+Session Connection::newSession(uint32_t detachedLifetime) {
+ shared_ptr<SessionCore> core(
+ new SessionCore(*impl, ++channelIdCounter, max_frame_size));
+ impl->addSession(core);
+ core->open(detachedLifetime);
+ return Session(core);
}
-void Connection::close()
-{
+void Connection::resume(Session& session) {
+ shared_ptr<SessionCore> core=session.impl;
+ core->setChannel(++channelIdCounter);
+ impl->addSession(core);
+ core->resume(*impl);
+}
+
+void Connection::close() {
impl->close();
}
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index f5d6a387a9..4a9a68e8b3 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -28,7 +28,7 @@
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-
+#include "qpid/framing/Uuid.h"
namespace qpid {
@@ -122,7 +122,25 @@ class Connection
*/
void openChannel(Channel&);
- Session newSession();
+ /**
+ * Create a new session on this connection. Sessions allow
+ * multiple streams of work to be multiplexed over the same
+ * connection.
+ *
+ *@param detachedLifetime: A session may be detached from its
+ * channel, either by calling Session::suspend() or because of a
+ * network failure. The session state is perserved for
+ * detachedLifetime seconds to allow a call to resume(). After
+ * that the broker may discard the session state. Default is 0,
+ * meaning the session cannot be resumed.
+ */
+ Session newSession(uint32_t detachedLifetime=0);
+
+ /**
+ * Resume a suspendded session. A session may be resumed
+ * on a different connection to the one that created it.
+ */
+ void resume(Session& session);
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 8ab60cff50..43576d2273 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,7 +18,11 @@
* under the License.
*
*/
+#include "qpid/framing/reply_exceptions.h"
+
#include "ConnectionImpl.h"
+#include "SessionCore.h"
+
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -26,7 +30,8 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false)
+ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
+ : connector(c), isClosed(false)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
@@ -37,22 +42,13 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), i
connector->setShutdownHandler(this);
}
-void ConnectionImpl::allocated(SessionCore::shared_ptr session)
-{
- Mutex::ScopedLock l(lock);
- if (sessions.find(session->getId()) != sessions.end()) {
- throw Exception("Id already in use.");
- }
- sessions[session->getId()] = session;
-}
-
-void ConnectionImpl::released(SessionCore::shared_ptr session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
- SessionMap::iterator i = sessions.find(session->getId());
- if (i != sessions.end()) {
- sessions.erase(i);
- }
+ boost::shared_ptr<SessionCore>& s = sessions[session->getChannel()];
+ if (s)
+ throw ChannelBusyException();
+ s = session;
}
void ConnectionImpl::handle(framing::AMQFrame& frame)
@@ -62,7 +58,14 @@ void ConnectionImpl::handle(framing::AMQFrame& frame)
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
- find(frame.getChannel())->handle(frame);
+ boost::shared_ptr<SessionCore> s;
+ {
+ Mutex::ScopedLock l(lock);
+ s = sessions[frame.getChannel()];
+ }
+ if (!s)
+ throw ChannelErrorException();
+ s->in(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -117,23 +120,12 @@ void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- Mutex::ScopedUnlock u(lock);
i->second->closed(code, text);
}
sessions.clear();
isClosed = true;
}
-SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
-{
- Mutex::ScopedLock l(lock);
- SessionMap::iterator i = sessions.find(id);
- if (i == sessions.end()) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- return i->second;
-}
-
void ConnectionImpl::assertNotClosed()
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index fc786ba643..975beaa101 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -30,17 +30,18 @@
#include "qpid/sys/TimeoutHandler.h"
#include "ConnectionHandler.h"
#include "Connector.h"
-#include "SessionCore.h"
namespace qpid {
namespace client {
+class SessionCore;
+
class ConnectionImpl : public framing::FrameHandler,
- public sys::TimeoutHandler,
- public sys::ShutdownHandler
+ public sys::TimeoutHandler,
+ public sys::ShutdownHandler
{
- typedef std::map<uint16_t, SessionCore::shared_ptr> SessionMap;
+ typedef std::map<uint16_t, boost::shared_ptr<SessionCore> > SessionMap;
SessionMap sessions;
ConnectionHandler handler;
boost::shared_ptr<Connector> connector;
@@ -56,14 +57,12 @@ class ConnectionImpl : public framing::FrameHandler,
void shutdown();
void signalClose(uint16_t, const std::string&);
void assertNotClosed();
- SessionCore::shared_ptr find(uint16_t);
-
public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
- void allocated(SessionCore::shared_ptr);
- void released(SessionCore::shared_ptr);
+ void addSession(const boost::shared_ptr<SessionCore>&);
+
void open(const std::string& host, int port = 5672,
const std::string& uid = "guest",
const std::string& pwd = "guest",
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 4e0ee05da2..7e4926bc25 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -170,7 +170,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker:
if(l) {
completion.listenForResult(id, l);
}
- AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
+ AMQFrame frame(0/*channel will be filled in by channel handler*/, command);
if (hasContent) {
frame.setEof(false);
}
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index a3a3cde390..427c39b61f 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -38,7 +38,7 @@ namespace client {
class ExecutionHandler :
private framing::AMQP_ServerOperations::ExecutionHandler,
- public ChainableFrameHandler,
+ public framing::FrameHandler,
public Execution
{
framing::SequenceNumber incomingCounter;
@@ -66,9 +66,14 @@ class ExecutionHandler :
public:
typedef CompletionTracker::ResultListener ResultListener;
+ // Allow other classes to set the out handler.
+ framing::FrameHandler::Chain out;
+
ExecutionHandler(uint64_t maxFrameSize = 65536);
+ // Incoming handler.
void handle(framing::AMQFrame& frame);
+
framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
ResultListener=ResultListener());
diff --git a/cpp/src/qpid/client/ScopedAssociation.h b/cpp/src/qpid/client/ScopedAssociation.h
deleted file mode 100644
index 861a28c0f8..0000000000
--- a/cpp/src/qpid/client/ScopedAssociation.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ScopedAssociation_
-#define _ScopedAssociation_
-
-#include "ConnectionImpl.h"
-#include "SessionCore.h"
-
-namespace qpid {
-namespace client {
-
-struct ScopedAssociation
-{
- typedef boost::shared_ptr<ScopedAssociation> shared_ptr;
-
- SessionCore::shared_ptr session;
- ConnectionImpl::shared_ptr connection;
-
- ScopedAssociation() {}
-
- ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c)
- {
- connection->allocated(session);
- }
-
- ~ScopedAssociation()
- {
- if (connection && session) connection->released(session);
- }
-};
-
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index f093e12594..3f8f9244ef 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -20,27 +20,25 @@
*/
#include "SessionCore.h"
-#include <boost/bind.hpp>
+#include "qpid/framing/constants.h"
#include "Future.h"
#include "FutureResponse.h"
#include "FutureResult.h"
+#include <boost/bind.hpp>
+
using namespace qpid::client;
using namespace qpid::framing;
-SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out,
- uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false)
+SessionCore::SessionCore(FrameHandler& out_, uint16_t ch, uint64_t maxFrameSize)
+ : channel(ch), l2(*this), l3(maxFrameSize), uuid(false), sync(false)
{
- l2.out = boost::bind(&FrameHandler::handle, out, _1);
- l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
- l3.out = boost::bind(&SessionHandler::outgoing, &l2, _1);
- l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2);
+ l2.next = &l3;
+ l3.out = &out;
+ out.next = &out_;
}
-void SessionCore::open()
-{
- l2.open(id);
-}
+SessionCore::~SessionCore() {}
ExecutionHandler& SessionCore::getExecution()
{
@@ -50,6 +48,7 @@ ExecutionHandler& SessionCore::getExecution()
FrameSet::shared_ptr SessionCore::get()
{
+ checkClosed();
return l3.getDemux().getDefault().pop();
}
@@ -63,38 +62,55 @@ bool SessionCore::isSync()
return sync;
}
-void SessionCore::close()
-{
- l2.close();
- stop();
+namespace {
+struct ClosedOnExit {
+ SessionCore& core;
+ int code;
+ std::string text;
+ ClosedOnExit(SessionCore& s, int c, const std::string& t)
+ : core(s), code(c), text(t) {}
+ ~ClosedOnExit() { core.closed(code, text); }
+};
}
-void SessionCore::stop()
+void SessionCore::close()
{
- l3.getDemux().close();
- l3.getCompletionTracker().close();
+ checkClosed();
+ ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user.");
+ l2.close();
}
-void SessionCore::handle(AMQFrame& frame)
-{
- l2.incoming(frame);
+void SessionCore::suspend() {
+ checkClosed();
+ ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended");
+ l2.suspend();
}
void SessionCore::closed(uint16_t code, const std::string& text)
{
- stop();
-
- isClosed = true;
+ out.next = 0;
reason.code = code;
reason.text = text;
+ l2.closed();
+ l3.getDemux().close();
+ l3.getCompletionTracker().close();
}
-void SessionCore::checkClosed()
+void SessionCore::checkClosed() const
{
- if (isClosed) {
- //TODO: could actually have been a connection exception
+ // TODO: could have been a connection exception
+ if(out.next == 0)
throw ChannelException(reason.code, reason.text);
- }
+}
+
+void SessionCore::open(uint32_t detachedLifetime) {
+ assert(out.next);
+ l2.open(detachedLifetime);
+}
+
+void SessionCore::resume(FrameHandler& out_) {
+ out.next = &out_;
+ l2.resume();
}
Future SessionCore::send(const AMQBody& command)
@@ -131,3 +147,15 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content)
//send method impl:
return Future(l3.send(command, content));
}
+
+void SessionCore::handleIn(AMQFrame& frame) {
+ l2.handle(frame);
+}
+
+void SessionCore::handleOut(AMQFrame& frame)
+{
+ checkClosed();
+ frame.setChannel(channel);
+ out.next->handle(frame);
+}
+
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 5b15a607b3..b717914206 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -28,6 +28,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/Uuid.h"
#include "SessionHandler.h"
#include "ExecutionHandler.h"
@@ -36,7 +37,12 @@ namespace client {
class Future;
-class SessionCore : public framing::FrameHandler
+/**
+ * Session implementation, sets up handler chains.
+ * Attaches to a SessionHandler when active, detaches
+ * when closed.
+ */
+class SessionCore : public framing::FrameHandler::InOutHandler
{
struct Reason
{
@@ -44,33 +50,49 @@ class SessionCore : public framing::FrameHandler
std::string text;
};
- ExecutionHandler l3;
+ uint16_t channel;
SessionHandler l2;
- const uint16_t id;
+ ExecutionHandler l3;
+ framing::Uuid uuid;
bool sync;
- bool isClosed;
Reason reason;
+
+ protected:
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+
+ public:
+ typedef shared_ptr<SessionCore> shared_ptr;
-public:
- typedef boost::shared_ptr<SessionCore> shared_ptr;
+ SessionCore(framing::FrameHandler& out, uint16_t channel, uint64_t maxFrameSize);
+ ~SessionCore();
- SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
framing::FrameSet::shared_ptr get();
- uint16_t getId() const { return id; }
- void setSync(bool);
- bool isSync();
- void open();
+
+ framing::Uuid getId() const { return uuid; }
+ void setId(const framing::Uuid& id) { uuid= id; }
+
+ uint16_t getChannel() const { assert(channel); return channel; }
+ void setChannel(uint16_t ch) { assert(ch); channel=ch; }
+
+ void open(uint32_t detachedLifetime);
+
+ /** Closed by client code */
void close();
- void stop();
+
+ /** Closed by peer */
void closed(uint16_t code, const std::string& text);
- void checkClosed();
+
+ void resume(framing::FrameHandler& out);
+ void suspend();
+
+ void setSync(bool);
+ bool isSync();
ExecutionHandler& getExecution();
+ void checkClosed() const;
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-
- //for incoming frames:
- void handle(framing::AMQFrame& frame);
};
}
diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp
index 93e628ab34..d3b04e5356 100644
--- a/cpp/src/qpid/client/SessionHandler.cpp
+++ b/cpp/src/qpid/client/SessionHandler.cpp
@@ -22,31 +22,44 @@
#include "SessionHandler.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
-SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {}
+namespace {
+// TODO aconway 2007-09-28: hack till we have multi-version support.
+ProtocolVersion version;
+}
+
+SessionHandler::SessionHandler(SessionCore& parent)
+ : StateManager(CLOSED), core(parent) {}
+
+SessionHandler::~SessionHandler() {}
-void SessionHandler::incoming(AMQFrame& frame)
+void SessionHandler::handle(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
if (getState() == OPEN) {
- SessionClosedBody* closeBody=
+ core.checkClosed();
+ SessionClosedBody* closedBody=
dynamic_cast<SessionClosedBody*>(body->getMethod());
- if (closeBody) {
- setState(CLOSED_BY_PEER);
- code = closeBody->getReplyCode();
- text = closeBody->getReplyText();
- if (onClose) {
- onClose(closeBody->getReplyCode(), closeBody->getReplyText());
- }
+ if (closedBody) {
+ closed();
+ core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
} else {
try {
- in(frame);
- }catch(ChannelException& e){
- closed(e.code, e.toString());
+ next->handle(frame);
+ }
+ catch(const ChannelException& e){
+ QPID_LOG(error, "Channel exception:" << e.what());
+ closed();
+ AMQFrame f(0, SessionClosedBody(version, e.code, e.toString()));
+ core.out(f);
+ core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
}
}
} else {
@@ -57,69 +70,63 @@ void SessionHandler::incoming(AMQFrame& frame)
}
}
-void SessionHandler::outgoing(AMQFrame& frame)
-{
- if (getState() == OPEN) {
- frame.setChannel(id);
- out(frame);
- } else if (getState() == CLOSED) {
- throw Exception(QPID_MSG("Channel not open, can't send " << frame));
- } else if (getState() == CLOSED_BY_PEER) {
- throw ChannelException(code, text);
- }
-}
-
-void SessionHandler::open(uint16_t _id)
+void SessionHandler::attach(const AMQMethodBody& command)
{
- id = _id;
-
setState(OPENING);
- // FIXME aconway 2007-09-19: Need to get this from API.
- AMQFrame f(id, SessionOpenBody(version, 0));
- out(f);
-
+ AMQFrame f(0, command);
+ core.out(f);
std::set<int> states;
states.insert(OPEN);
- states.insert(CLOSED_BY_PEER);
+ states.insert(CLOSED);
waitFor(states);
- if (getState() != OPEN) {
- throw Exception("Failed to open channel.");
- }
+ if (getState() != OPEN)
+ throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel()));
+}
+
+void SessionHandler::open(uint32_t detachedLifetime) {
+ attach(SessionOpenBody(version, detachedLifetime));
}
-void SessionHandler::close()
+void SessionHandler::resume() {
+ attach(SessionResumeBody(version, core.getId()));
+}
+
+void SessionHandler::detach(const AMQMethodBody& command)
{
setState(CLOSING);
- AMQFrame f(id, SessionCloseBody(version));
- out(f);
+ AMQFrame f(0, command);
+ core.out(f);
waitFor(CLOSED);
}
-void SessionHandler::closed(uint16_t code, const std::string& msg)
-{
- setState(CLOSED);
- AMQFrame f(id, SessionClosedBody(version, code, msg));
- out(f);
-}
+void SessionHandler::close() { detach(SessionCloseBody(version)); }
+void SessionHandler::suspend() { detach(SessionSuspendBody(version)); }
+void SessionHandler::closed() { setState(CLOSED); }
void SessionHandler::handleMethod(AMQMethodBody* method)
{
switch (getState()) {
- case OPENING:
- if (method->isA<SessionAttachedBody>()) {
- setState(OPEN);
- } else {
- throw ConnectionException(504, "Channel not opened.");
- }
- break;
+ case OPENING: {
+ SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method);
+ if (attached) {
+ core.setId(attached->getSessionId());
+ setState(OPEN);
+ } else
+ throw ChannelErrorException();
+ break;
+ }
case CLOSING:
- if (method->isA<SessionClosedBody>()) {
- setState(CLOSED);
- } //else just ignore it
+ if (method->isA<SessionClosedBody>() ||
+ method->isA<SessionDetachedBody>())
+ closed();
break;
+
case CLOSED:
- throw ConnectionException(504, "Channel is closed.");
+ throw ChannelErrorException();
+
default:
- throw Exception("Unexpected state encountered in SessionHandler!");
+ assert(0);
+ throw InternalErrorException(QPID_MSG("Internal Error."));
}
}
+
diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h
index e71d527406..994b8402de 100644
--- a/cpp/src/qpid/client/SessionHandler.h
+++ b/cpp/src/qpid/client/SessionHandler.h
@@ -22,36 +22,40 @@
#define _SessionHandler_
#include "StateManager.h"
-#include "ChainableFrameHandler.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/shared_ptr.h"
namespace qpid {
namespace client {
+class SessionCore;
-class SessionHandler : private StateManager, public ChainableFrameHandler
+/**
+ * Handles incoming session (L2) commands.
+ */
+class SessionHandler : public framing::FrameHandler,
+ private StateManager
{
- enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER};
- framing::ProtocolVersion version;
- uint16_t id;
+ enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+ SessionCore& core;
- uint16_t code;
- std::string text;
-
void handleMethod(framing::AMQMethodBody* method);
- void closed(uint16_t code, const std::string& msg);
-
-public:
- typedef boost::function<void(uint16_t, const std::string&)> CloseListener;
-
- SessionHandler();
+ void attach(const framing::AMQMethodBody&);
+ void detach(const framing::AMQMethodBody&);
+
+ public:
+ SessionHandler(SessionCore& parent);
+ ~SessionHandler();
- void incoming(framing::AMQFrame& frame);
- void outgoing(framing::AMQFrame& frame);
+ /** Incoming from broker */
+ void handle(framing::AMQFrame&);
- void open(uint16_t id);
+ void open(uint32_t detachedLifetime);
+ void resume();
void close();
-
- CloseListener onClose;
+ void closed();
+ void suspend();
};
}}