summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-19 22:34:11 +0000
committerAlan Conway <aconway@apache.org>2007-09-19 22:34:11 +0000
commit7ac52b8288273de98f3e97ee8e34776a61034bfc (patch)
treee0a180875e3d097fabcb9b01d543a9d65f4f5cc2 /cpp/src/qpid/broker
parent9759046f7a53c88d17355d75d9ca7cc38ec35657 (diff)
downloadqpid-python-7ac52b8288273de98f3e97ee8e34776a61034bfc.tar.gz
AMQP 0-10 Session suppported on broker and client.
Client always uses session on the wire but client::Channel API is still available until all C++ tests are migrated. Broker allows both session and channel connection to support python tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577459 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Session.cpp1
-rw-r--r--cpp/src/qpid/broker/Session.h6
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp77
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h30
4 files changed, 100 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
index c59119140c..d379b40d3f 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -60,6 +60,7 @@ Session::Session(SessionHandler& a, uint32_t t)
: adapter(&a),
broker(adapter->getConnection().broker),
timeout(t),
+ id(true),
prefetchSize(0),
prefetchCount(0),
tagGenerator("sgen"),
diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h
index eea36ba5fc..80f1159f04 100644
--- a/cpp/src/qpid/broker/Session.h
+++ b/cpp/src/qpid/broker/Session.h
@@ -34,6 +34,7 @@
#include "TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/shared_ptr.h"
#include <boost/ptr_container/ptr_vector.hpp>
@@ -96,6 +97,7 @@ class Session : public framing::FrameHandler::Chains,
SessionHandler* adapter;
Broker& broker;
uint32_t timeout;
+ framing::Uuid id;
boost::ptr_vector<framing::FrameHandler> handlers;
DeliveryAdapter* deliveryAdapter;
@@ -135,8 +137,10 @@ class Session : public framing::FrameHandler::Chains,
Broker& getBroker() const { return broker; }
- /** Session timeout. */
+ /** Session timeout, aka detached-lifetime. */
uint32_t getTimeout() const { return timeout; }
+ /** Session ID */
+ const framing::Uuid& getId() const { return id; }
/**
* Get named queue, never returns 0.
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index d4f8c25892..e7ef6fdb87 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -28,11 +28,13 @@
namespace qpid {
namespace broker {
using namespace framing;
+using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
connection(c), channel(ch), proxy(out),
- ignoring(false), channelHandler(*this) {}
+ ignoring(false), channelHandler(*this),
+ useChannelClose(false) {}
SessionHandler::~SessionHandler() {}
@@ -50,18 +52,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m=f.getMethod();
try {
- if (m && m->invoke(&channelHandler))
+ if (m && (m->invoke(this) || m->invoke(&channelHandler)))
return;
else if (session)
session->in(f);
else if (!ignoring)
throw ChannelErrorException(
QPID_MSG("Channel " << channel << " is not open"));
- } catch(const ChannelException& e){
- getProxy().getChannel().close(
- e.code, e.toString(), classId(m), methodId(m));
- session.reset();
+ } catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
+ session.reset();
+ // FIXME aconway 2007-09-19: Dual-mode hack.
+ if (useChannelClose)
+ getProxy().getChannel().close(
+ e.code, e.toString(), classId(m), methodId(m));
+ else
+ getProxy().getSession().closed(e.code, e.toString());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -93,6 +99,7 @@ void SessionHandler::assertClosed(const char* method) {
}
void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){
+ parent.useChannelClose=true;
parent.assertClosed("open");
parent.session.reset(new Session(parent, 0));
parent.getProxy().getChannel().openOk();
@@ -112,7 +119,7 @@ void SessionHandler::ChannelMethods::close(uint16_t replyCode,
{
// FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
// to text names.
- QPID_LOG(warning, "Received session.close("<<replyCode<<","
+ QPID_LOG(warning, "Received channel.close("<<replyCode<<","
<<replyText << ","
<< "classid=" <<classId<< ","
<< "methodid=" <<methodId);
@@ -136,4 +143,60 @@ void SessionHandler::ChannelMethods::ok()
//sufficient
}
+void SessionHandler::open(uint32_t detachedLifetime) {
+ assertClosed("open");
+ session.reset(new Session(*this, detachedLifetime));
+ getProxy().getSession().attached(session->getId(), session->getTimeout());
+}
+
+void SessionHandler::flow(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::flowOk(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::close() {
+ QPID_LOG(info, "Received session.close");
+ ignoring=false;
+ session.reset();
+ getProxy().getSession().closed(REPLY_SUCCESS, "ok");
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
+ // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
+ // to text names.
+ QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+ ignoring=false;
+ session.reset();
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::resume(const Uuid& /*sessionId*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::suspend() {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
+ const SequenceNumberSet& /*seenFrameSet*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::solicitAck() {
+ assert(0); throw NotImplementedException();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 219cd01396..5962ab77a8 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_SESSIONADAPTER_H
-#define QPID_BROKER_SESSIONADAPTER_H
+#ifndef QPID_BROKER_SESSIONHANDLER_H
+#define QPID_BROKER_SESSIONHANDLER_H
/*
*
@@ -40,7 +40,8 @@ class Session;
*
* SessionHandlers can be stored in a map by value.
*/
-class SessionHandler : public framing::FrameHandler::InOutHandler
+class SessionHandler : public framing::FrameHandler::InOutHandler,
+ private framing::AMQP_ServerOperations::SessionHandler
{
public:
SessionHandler(Connection&, framing::ChannelId);
@@ -63,7 +64,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler
void handleOut(framing::AMQFrame&);
private:
- // FIXME aconway 2007-08-31: Move to session methods.
+ // FIXME aconway 2007-08-31: Drop channel.
struct ChannelMethods : public framing::AMQP_ServerOperations::ChannelHandler {
SessionHandler& parent;
@@ -81,7 +82,21 @@ class SessionHandler : public framing::FrameHandler::InOutHandler
void closeOk();
};
friend class ChannelMethods;
-
+
+ /// Session methods
+ void open(uint32_t detachedLifetime);
+ void flow(bool active);
+ void flowOk(bool active);
+ void close();
+ void closed(uint16_t replyCode, const std::string& replyText);
+ void resume(const framing::Uuid& sessionId);
+ void suspend();
+ void ack(uint32_t cumulativeSeenMark,
+ const framing::SequenceNumberSet& seenFrameSet);
+ void highWaterMark(uint32_t lastSentMark);
+ void solicitAck();
+
+
void assertOpen(const char* method);
void assertClosed(const char* method);
@@ -91,8 +106,11 @@ class SessionHandler : public framing::FrameHandler::InOutHandler
shared_ptr<Session> session;
bool ignoring;
ChannelMethods channelHandler;
+ bool useChannelClose; // FIXME aconway 2007-09-19: remove with channel.
};
}} // namespace qpid::broker
-#endif /*!QPID_BROKER_SESSIONADAPTER_H*/
+
+
+#endif /*!QPID_BROKER_SESSIONHANDLER_H*/