From 7ac52b8288273de98f3e97ee8e34776a61034bfc Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 19 Sep 2007 22:34:11 +0000 Subject: AMQP 0-10 Session suppported on broker and client. Client always uses session on the wire but client::Channel API is still available until all C++ tests are migrated. Broker allows both session and channel connection to support python tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577459 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 4 +- cpp/src/qpid/broker/Session.cpp | 1 + cpp/src/qpid/broker/Session.h | 6 +- cpp/src/qpid/broker/SessionHandler.cpp | 77 ++++++++++++++++++-- cpp/src/qpid/broker/SessionHandler.h | 30 ++++++-- cpp/src/qpid/client/ChannelHandler.cpp | 127 --------------------------------- cpp/src/qpid/client/ChannelHandler.h | 61 ---------------- cpp/src/qpid/client/SessionCore.cpp | 2 +- cpp/src/qpid/client/SessionCore.h | 4 +- cpp/src/qpid/client/SessionHandler.cpp | 124 ++++++++++++++++++++++++++++++++ cpp/src/qpid/client/SessionHandler.h | 59 +++++++++++++++ 11 files changed, 288 insertions(+), 207 deletions(-) delete mode 100644 cpp/src/qpid/client/ChannelHandler.cpp delete mode 100644 cpp/src/qpid/client/ChannelHandler.h create mode 100644 cpp/src/qpid/client/SessionHandler.cpp create mode 100644 cpp/src/qpid/client/SessionHandler.h (limited to 'cpp') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index a4e98c5b68..06d17c5ccc 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -218,7 +218,7 @@ libqpidclient_la_SOURCES = \ qpid/client/MessageListener.cpp \ qpid/client/Correlator.cpp \ qpid/client/CompletionTracker.cpp \ - qpid/client/ChannelHandler.cpp \ + qpid/client/SessionHandler.cpp \ qpid/client/ConnectionHandler.cpp \ qpid/client/ExecutionHandler.cpp \ qpid/client/FutureCompletion.cpp \ @@ -308,7 +308,7 @@ nobase_include_HEADERS = \ qpid/client/BlockingQueue.h \ qpid/client/Correlator.h \ qpid/client/CompletionTracker.h \ - qpid/client/ChannelHandler.h \ + qpid/client/SessionHandler.h \ qpid/client/ChainableFrameHandler.h \ qpid/client/ConnectionHandler.h \ qpid/client/Execution.h \ diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index c59119140c..d379b40d3f 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -60,6 +60,7 @@ Session::Session(SessionHandler& a, uint32_t t) : adapter(&a), broker(adapter->getConnection().broker), timeout(t), + id(true), prefetchSize(0), prefetchCount(0), tagGenerator("sgen"), diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h index eea36ba5fc..80f1159f04 100644 --- a/cpp/src/qpid/broker/Session.h +++ b/cpp/src/qpid/broker/Session.h @@ -34,6 +34,7 @@ #include "TxBuffer.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/Uuid.h" #include "qpid/shared_ptr.h" #include @@ -96,6 +97,7 @@ class Session : public framing::FrameHandler::Chains, SessionHandler* adapter; Broker& broker; uint32_t timeout; + framing::Uuid id; boost::ptr_vector handlers; DeliveryAdapter* deliveryAdapter; @@ -135,8 +137,10 @@ class Session : public framing::FrameHandler::Chains, Broker& getBroker() const { return broker; } - /** Session timeout. */ + /** Session timeout, aka detached-lifetime. */ uint32_t getTimeout() const { return timeout; } + /** Session ID */ + const framing::Uuid& getId() const { return id; } /** * Get named queue, never returns 0. diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index d4f8c25892..e7ef6fdb87 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -28,11 +28,13 @@ namespace qpid { namespace broker { using namespace framing; +using namespace std; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), proxy(out), - ignoring(false), channelHandler(*this) {} + ignoring(false), channelHandler(*this), + useChannelClose(false) {} SessionHandler::~SessionHandler() {} @@ -50,18 +52,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m=f.getMethod(); try { - if (m && m->invoke(&channelHandler)) + if (m && (m->invoke(this) || m->invoke(&channelHandler))) return; else if (session) session->in(f); else if (!ignoring) throw ChannelErrorException( QPID_MSG("Channel " << channel << " is not open")); - } catch(const ChannelException& e){ - getProxy().getChannel().close( - e.code, e.toString(), classId(m), methodId(m)); - session.reset(); + } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. + session.reset(); + // FIXME aconway 2007-09-19: Dual-mode hack. + if (useChannelClose) + getProxy().getChannel().close( + e.code, e.toString(), classId(m), methodId(m)); + else + getProxy().getSession().closed(e.code, e.toString()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -93,6 +99,7 @@ void SessionHandler::assertClosed(const char* method) { } void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){ + parent.useChannelClose=true; parent.assertClosed("open"); parent.session.reset(new Session(parent, 0)); parent.getProxy().getChannel().openOk(); @@ -112,7 +119,7 @@ void SessionHandler::ChannelMethods::close(uint16_t replyCode, { // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids // to text names. - QPID_LOG(warning, "Received session.close("<getId(), session->getTimeout()); +} + +void SessionHandler::flow(bool /*active*/) { + // FIXME aconway 2007-09-19: Removed in 0-10, remove + assert(0); throw NotImplementedException(); +} + +void SessionHandler::flowOk(bool /*active*/) { + // FIXME aconway 2007-09-19: Removed in 0-10, remove + assert(0); throw NotImplementedException(); +} + +void SessionHandler::close() { + QPID_LOG(info, "Received session.close"); + ignoring=false; + session.reset(); + getProxy().getSession().closed(REPLY_SUCCESS, "ok"); + // No need to remove from connection map, will be re-used + // if channel is re-opened. +} + +void SessionHandler::closed(uint16_t replyCode, const string& replyText) { + // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids + // to text names. + QPID_LOG(warning, "Received session.closed: "< session; bool ignoring; ChannelMethods channelHandler; + bool useChannelClose; // FIXME aconway 2007-09-19: remove with channel. }; }} // namespace qpid::broker -#endif /*!QPID_BROKER_SESSIONADAPTER_H*/ + + +#endif /*!QPID_BROKER_SESSIONHANDLER_H*/ diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp deleted file mode 100644 index 49e7285a47..0000000000 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ChannelHandler.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/all_method_bodies.h" - -using namespace qpid::client; -using namespace qpid::framing; -using namespace boost; - -ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {} - -void ChannelHandler::incoming(AMQFrame& frame) -{ - AMQBody* body = frame.getBody(); - if (getState() == OPEN) { - ChannelCloseBody* closeBody= - dynamic_cast(body->getMethod()); - if (closeBody) { - setState(CLOSED_BY_PEER); - code = closeBody->getReplyCode(); - text = closeBody->getReplyText(); - if (onClose) { - onClose(closeBody->getReplyCode(), closeBody->getReplyText()); - } - } else { - try { - in(frame); - }catch(ChannelException& e){ - AMQMethodBody* method=body->getMethod(); - if (method) - close(e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - else - close(e.code, e.toString(), 0, 0); - } - } - } else { - if (body->getMethod()) - handleMethod(body->getMethod()); - else - throw ConnectionException(504, "Channel not open for content."); - } -} - -void ChannelHandler::outgoing(AMQFrame& frame) -{ - if (getState() == OPEN) { - frame.setChannel(id); - out(frame); - } else if (getState() == CLOSED) { - throw Exception(QPID_MSG("Channel not open, can't send " << frame)); - } else if (getState() == CLOSED_BY_PEER) { - throw ChannelException(code, text); - } -} - -void ChannelHandler::open(uint16_t _id) -{ - id = _id; - - setState(OPENING); - AMQFrame f(id, ChannelOpenBody(version)); - out(f); - - std::set states; - states.insert(OPEN); - states.insert(CLOSED_BY_PEER); - waitFor(states); - if (getState() != OPEN) { - throw Exception("Failed to open channel."); - } -} - -void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) -{ - setState(CLOSING); - AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId)); - out(f); -} - -void ChannelHandler::close() -{ - close(200, "OK", 0, 0); - waitFor(CLOSED); -} - -void ChannelHandler::handleMethod(AMQMethodBody* method) -{ - switch (getState()) { - case OPENING: - if (method->isA()) { - setState(OPEN); - } else { - throw ConnectionException(504, "Channel not opened."); - } - break; - case CLOSING: - if (method->isA()) { - setState(CLOSED); - } //else just ignore it - break; - case CLOSED: - throw ConnectionException(504, "Channel is closed."); - default: - throw Exception("Unexpected state encountered in ChannelHandler!"); - } -} diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h deleted file mode 100644 index 24c24e49c4..0000000000 --- a/cpp/src/qpid/client/ChannelHandler.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ChannelHandler_ -#define _ChannelHandler_ - -#include "StateManager.h" -#include "ChainableFrameHandler.h" -#include "qpid/framing/amqp_framing.h" - -namespace qpid { -namespace client { - -class ChannelHandler : private StateManager, public ChainableFrameHandler -{ - enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER}; - framing::ProtocolVersion version; - uint16_t id; - - uint16_t code; - std::string text; - - void handleMethod(framing::AMQMethodBody* method); - - void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId); - - -public: - typedef boost::function CloseListener; - - ChannelHandler(); - - void incoming(framing::AMQFrame& frame); - void outgoing(framing::AMQFrame& frame); - - void open(uint16_t id); - void close(); - - CloseListener onClose; -}; - -}} - -#endif diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 3595479642..82b3e77b94 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -33,7 +33,7 @@ SessionCore::SessionCore(uint16_t _id, boost::shared_ptr { l2.out = boost::bind(&FrameHandler::handle, out, _1); l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1); - l3.out = boost::bind(&ChannelHandler::outgoing, &l2, _1); + l3.out = boost::bind(&SessionHandler::outgoing, &l2, _1); l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 80fe13715f..5b15a607b3 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -28,7 +28,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" -#include "ChannelHandler.h" +#include "SessionHandler.h" #include "ExecutionHandler.h" namespace qpid { @@ -45,7 +45,7 @@ class SessionCore : public framing::FrameHandler }; ExecutionHandler l3; - ChannelHandler l2; + SessionHandler l2; const uint16_t id; bool sync; bool isClosed; diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp new file mode 100644 index 0000000000..3885ac437a --- /dev/null +++ b/cpp/src/qpid/client/SessionHandler.cpp @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionHandler.h" +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/all_method_bodies.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace boost; + +SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {} + +void SessionHandler::incoming(AMQFrame& frame) +{ + AMQBody* body = frame.getBody(); + if (getState() == OPEN) { + SessionClosedBody* closeBody= + dynamic_cast(body->getMethod()); + if (closeBody) { + setState(CLOSED_BY_PEER); + code = closeBody->getReplyCode(); + text = closeBody->getReplyText(); + if (onClose) { + onClose(closeBody->getReplyCode(), closeBody->getReplyText()); + } + } else { + try { + in(frame); + }catch(ChannelException& e){ + closed(e.code, e.toString()); + } + } + } else { + if (body->getMethod()) + handleMethod(body->getMethod()); + else + throw ConnectionException(504, "Channel not open for content."); + } +} + +void SessionHandler::outgoing(AMQFrame& frame) +{ + if (getState() == OPEN) { + frame.setChannel(id); + out(frame); + } else if (getState() == CLOSED) { + throw Exception(QPID_MSG("Channel not open, can't send " << frame)); + } else if (getState() == CLOSED_BY_PEER) { + throw ChannelException(code, text); + } +} + +void SessionHandler::open(uint16_t _id) +{ + id = _id; + + setState(OPENING); + AMQFrame f(id, SessionOpenBody(version)); + out(f); + + std::set states; + states.insert(OPEN); + states.insert(CLOSED_BY_PEER); + waitFor(states); + if (getState() != OPEN) { + throw Exception("Failed to open channel."); + } +} + +void SessionHandler::close() +{ + setState(CLOSING); + AMQFrame f(id, SessionCloseBody(version)); + out(f); + waitFor(CLOSED); +} + +void SessionHandler::closed(uint16_t code, const std::string& msg) +{ + setState(CLOSED); + AMQFrame f(id, SessionClosedBody(version, code, msg)); + out(f); +} + +void SessionHandler::handleMethod(AMQMethodBody* method) +{ + switch (getState()) { + case OPENING: + if (method->isA()) { + setState(OPEN); + } else { + throw ConnectionException(504, "Channel not opened."); + } + break; + case CLOSING: + if (method->isA()) { + setState(CLOSED); + } //else just ignore it + break; + case CLOSED: + throw ConnectionException(504, "Channel is closed."); + default: + throw Exception("Unexpected state encountered in SessionHandler!"); + } +} diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h new file mode 100644 index 0000000000..e71d527406 --- /dev/null +++ b/cpp/src/qpid/client/SessionHandler.h @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandler_ +#define _SessionHandler_ + +#include "StateManager.h" +#include "ChainableFrameHandler.h" +#include "qpid/framing/amqp_framing.h" + +namespace qpid { +namespace client { + +class SessionHandler : private StateManager, public ChainableFrameHandler +{ + enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER}; + framing::ProtocolVersion version; + uint16_t id; + + uint16_t code; + std::string text; + + void handleMethod(framing::AMQMethodBody* method); + void closed(uint16_t code, const std::string& msg); + +public: + typedef boost::function CloseListener; + + SessionHandler(); + + void incoming(framing::AMQFrame& frame); + void outgoing(framing::AMQFrame& frame); + + void open(uint16_t id); + void close(); + + CloseListener onClose; +}; + +}} + +#endif -- cgit v1.2.1