diff options
| author | Alan Conway <aconway@apache.org> | 2007-09-19 22:34:11 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-09-19 22:34:11 +0000 |
| commit | 7ac52b8288273de98f3e97ee8e34776a61034bfc (patch) | |
| tree | e0a180875e3d097fabcb9b01d543a9d65f4f5cc2 /cpp/src/qpid/broker | |
| parent | 9759046f7a53c88d17355d75d9ca7cc38ec35657 (diff) | |
| download | qpid-python-7ac52b8288273de98f3e97ee8e34776a61034bfc.tar.gz | |
AMQP 0-10 Session suppported on broker and client.
Client always uses session on the wire but client::Channel API is
still available until all C++ tests are migrated.
Broker allows both session and channel connection to support python
tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577459 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Session.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 77 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 30 |
4 files changed, 100 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index c59119140c..d379b40d3f 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -60,6 +60,7 @@ Session::Session(SessionHandler& a, uint32_t t) : adapter(&a), broker(adapter->getConnection().broker), timeout(t), + id(true), prefetchSize(0), prefetchCount(0), tagGenerator("sgen"), diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h index eea36ba5fc..80f1159f04 100644 --- a/cpp/src/qpid/broker/Session.h +++ b/cpp/src/qpid/broker/Session.h @@ -34,6 +34,7 @@ #include "TxBuffer.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/Uuid.h" #include "qpid/shared_ptr.h" #include <boost/ptr_container/ptr_vector.hpp> @@ -96,6 +97,7 @@ class Session : public framing::FrameHandler::Chains, SessionHandler* adapter; Broker& broker; uint32_t timeout; + framing::Uuid id; boost::ptr_vector<framing::FrameHandler> handlers; DeliveryAdapter* deliveryAdapter; @@ -135,8 +137,10 @@ class Session : public framing::FrameHandler::Chains, Broker& getBroker() const { return broker; } - /** Session timeout. */ + /** Session timeout, aka detached-lifetime. */ uint32_t getTimeout() const { return timeout; } + /** Session ID */ + const framing::Uuid& getId() const { return id; } /** * Get named queue, never returns 0. diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index d4f8c25892..e7ef6fdb87 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -28,11 +28,13 @@ namespace qpid { namespace broker { using namespace framing; +using namespace std; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), proxy(out), - ignoring(false), channelHandler(*this) {} + ignoring(false), channelHandler(*this), + useChannelClose(false) {} SessionHandler::~SessionHandler() {} @@ -50,18 +52,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m=f.getMethod(); try { - if (m && m->invoke(&channelHandler)) + if (m && (m->invoke(this) || m->invoke(&channelHandler))) return; else if (session) session->in(f); else if (!ignoring) throw ChannelErrorException( QPID_MSG("Channel " << channel << " is not open")); - } catch(const ChannelException& e){ - getProxy().getChannel().close( - e.code, e.toString(), classId(m), methodId(m)); - session.reset(); + } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. + session.reset(); + // FIXME aconway 2007-09-19: Dual-mode hack. + if (useChannelClose) + getProxy().getChannel().close( + e.code, e.toString(), classId(m), methodId(m)); + else + getProxy().getSession().closed(e.code, e.toString()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -93,6 +99,7 @@ void SessionHandler::assertClosed(const char* method) { } void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){ + parent.useChannelClose=true; parent.assertClosed("open"); parent.session.reset(new Session(parent, 0)); parent.getProxy().getChannel().openOk(); @@ -112,7 +119,7 @@ void SessionHandler::ChannelMethods::close(uint16_t replyCode, { // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids // to text names. - QPID_LOG(warning, "Received session.close("<<replyCode<<"," + QPID_LOG(warning, "Received channel.close("<<replyCode<<"," <<replyText << "," << "classid=" <<classId<< "," << "methodid=" <<methodId); @@ -136,4 +143,60 @@ void SessionHandler::ChannelMethods::ok() //sufficient } +void SessionHandler::open(uint32_t detachedLifetime) { + assertClosed("open"); + session.reset(new Session(*this, detachedLifetime)); + getProxy().getSession().attached(session->getId(), session->getTimeout()); +} + +void SessionHandler::flow(bool /*active*/) { + // FIXME aconway 2007-09-19: Removed in 0-10, remove + assert(0); throw NotImplementedException(); +} + +void SessionHandler::flowOk(bool /*active*/) { + // FIXME aconway 2007-09-19: Removed in 0-10, remove + assert(0); throw NotImplementedException(); +} + +void SessionHandler::close() { + QPID_LOG(info, "Received session.close"); + ignoring=false; + session.reset(); + getProxy().getSession().closed(REPLY_SUCCESS, "ok"); + // No need to remove from connection map, will be re-used + // if channel is re-opened. +} + +void SessionHandler::closed(uint16_t replyCode, const string& replyText) { + // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids + // to text names. + QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); + ignoring=false; + session.reset(); + // No need to remove from connection map, will be re-used + // if channel is re-opened. +} + +void SessionHandler::resume(const Uuid& /*sessionId*/) { + assert(0); throw NotImplementedException(); +} + +void SessionHandler::suspend() { + assert(0); throw NotImplementedException(); +} + +void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/, + const SequenceNumberSet& /*seenFrameSet*/) { + assert(0); throw NotImplementedException(); +} + +void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { + assert(0); throw NotImplementedException(); +} + +void SessionHandler::solicitAck() { + assert(0); throw NotImplementedException(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 219cd01396..5962ab77a8 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -1,5 +1,5 @@ -#ifndef QPID_BROKER_SESSIONADAPTER_H -#define QPID_BROKER_SESSIONADAPTER_H +#ifndef QPID_BROKER_SESSIONHANDLER_H +#define QPID_BROKER_SESSIONHANDLER_H /* * @@ -40,7 +40,8 @@ class Session; * * SessionHandlers can be stored in a map by value. */ -class SessionHandler : public framing::FrameHandler::InOutHandler +class SessionHandler : public framing::FrameHandler::InOutHandler, + private framing::AMQP_ServerOperations::SessionHandler { public: SessionHandler(Connection&, framing::ChannelId); @@ -63,7 +64,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler void handleOut(framing::AMQFrame&); private: - // FIXME aconway 2007-08-31: Move to session methods. + // FIXME aconway 2007-08-31: Drop channel. struct ChannelMethods : public framing::AMQP_ServerOperations::ChannelHandler { SessionHandler& parent; @@ -81,7 +82,21 @@ class SessionHandler : public framing::FrameHandler::InOutHandler void closeOk(); }; friend class ChannelMethods; - + + /// Session methods + void open(uint32_t detachedLifetime); + void flow(bool active); + void flowOk(bool active); + void close(); + void closed(uint16_t replyCode, const std::string& replyText); + void resume(const framing::Uuid& sessionId); + void suspend(); + void ack(uint32_t cumulativeSeenMark, + const framing::SequenceNumberSet& seenFrameSet); + void highWaterMark(uint32_t lastSentMark); + void solicitAck(); + + void assertOpen(const char* method); void assertClosed(const char* method); @@ -91,8 +106,11 @@ class SessionHandler : public framing::FrameHandler::InOutHandler shared_ptr<Session> session; bool ignoring; ChannelMethods channelHandler; + bool useChannelClose; // FIXME aconway 2007-09-19: remove with channel. }; }} // namespace qpid::broker -#endif /*!QPID_BROKER_SESSIONADAPTER_H*/ + + +#endif /*!QPID_BROKER_SESSIONHANDLER_H*/ |
