summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandler.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-19 22:34:11 +0000
committerAlan Conway <aconway@apache.org>2007-09-19 22:34:11 +0000
commit7ac52b8288273de98f3e97ee8e34776a61034bfc (patch)
treee0a180875e3d097fabcb9b01d543a9d65f4f5cc2 /cpp/src/qpid/broker/SessionHandler.cpp
parent9759046f7a53c88d17355d75d9ca7cc38ec35657 (diff)
downloadqpid-python-7ac52b8288273de98f3e97ee8e34776a61034bfc.tar.gz
AMQP 0-10 Session suppported on broker and client.
Client always uses session on the wire but client::Channel API is still available until all C++ tests are migrated. Broker allows both session and channel connection to support python tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577459 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp77
1 files changed, 70 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index d4f8c25892..e7ef6fdb87 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -28,11 +28,13 @@
namespace qpid {
namespace broker {
using namespace framing;
+using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
connection(c), channel(ch), proxy(out),
- ignoring(false), channelHandler(*this) {}
+ ignoring(false), channelHandler(*this),
+ useChannelClose(false) {}
SessionHandler::~SessionHandler() {}
@@ -50,18 +52,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m=f.getMethod();
try {
- if (m && m->invoke(&channelHandler))
+ if (m && (m->invoke(this) || m->invoke(&channelHandler)))
return;
else if (session)
session->in(f);
else if (!ignoring)
throw ChannelErrorException(
QPID_MSG("Channel " << channel << " is not open"));
- } catch(const ChannelException& e){
- getProxy().getChannel().close(
- e.code, e.toString(), classId(m), methodId(m));
- session.reset();
+ } catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
+ session.reset();
+ // FIXME aconway 2007-09-19: Dual-mode hack.
+ if (useChannelClose)
+ getProxy().getChannel().close(
+ e.code, e.toString(), classId(m), methodId(m));
+ else
+ getProxy().getSession().closed(e.code, e.toString());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -93,6 +99,7 @@ void SessionHandler::assertClosed(const char* method) {
}
void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){
+ parent.useChannelClose=true;
parent.assertClosed("open");
parent.session.reset(new Session(parent, 0));
parent.getProxy().getChannel().openOk();
@@ -112,7 +119,7 @@ void SessionHandler::ChannelMethods::close(uint16_t replyCode,
{
// FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
// to text names.
- QPID_LOG(warning, "Received session.close("<<replyCode<<","
+ QPID_LOG(warning, "Received channel.close("<<replyCode<<","
<<replyText << ","
<< "classid=" <<classId<< ","
<< "methodid=" <<methodId);
@@ -136,4 +143,60 @@ void SessionHandler::ChannelMethods::ok()
//sufficient
}
+void SessionHandler::open(uint32_t detachedLifetime) {
+ assertClosed("open");
+ session.reset(new Session(*this, detachedLifetime));
+ getProxy().getSession().attached(session->getId(), session->getTimeout());
+}
+
+void SessionHandler::flow(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::flowOk(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::close() {
+ QPID_LOG(info, "Received session.close");
+ ignoring=false;
+ session.reset();
+ getProxy().getSession().closed(REPLY_SUCCESS, "ok");
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
+ // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
+ // to text names.
+ QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+ ignoring=false;
+ session.reset();
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::resume(const Uuid& /*sessionId*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::suspend() {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
+ const SequenceNumberSet& /*seenFrameSet*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::solicitAck() {
+ assert(0); throw NotImplementedException();
+}
+
}} // namespace qpid::broker