From b3992a21f76f38797723f70168ecda6be21e3719 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 24 Sep 2007 21:15:46 +0000 Subject: 2007-09-24 Alan Conway * cpp/src/qpid/broker/SessionManager.cpp: Manage suspended sessions. Replaces SuspendedSessions. * cpp/src/qpid/broker/SessionState.cpp: Work with SessionManager. * cpp/src/qpid/broker/SessionHandler.cpp: Owns SessionState. * cpp/src/qpid/broker/Connection.h, .cpp: Owns session handlers. * cpp/src/qpid/broker/Broker.h: Added SessionManager member. * cpp/src/Makefile.am: Added broker/SessionManager.cpp * amqp.0-10-preview.xml: Added session-busy and channel-busy constants. * cpp/src/tests/.valgrind.supp-default: Added suppresssions for F7. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578975 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.h | 4 ++ cpp/src/qpid/broker/Connection.cpp | 9 ++-- cpp/src/qpid/broker/Connection.h | 6 +-- cpp/src/qpid/broker/SessionHandler.cpp | 12 +++-- cpp/src/qpid/broker/SessionHandler.h | 9 ++-- cpp/src/qpid/broker/SessionManager.cpp | 84 +++++++++++++++++++++++++++++++ cpp/src/qpid/broker/SessionManager.h | 83 ++++++++++++++++++++++++++++++ cpp/src/qpid/broker/SessionState.cpp | 15 ++++-- cpp/src/qpid/broker/SessionState.h | 18 ++++--- cpp/src/qpid/broker/SuspendedSessions.cpp | 60 ---------------------- cpp/src/qpid/broker/SuspendedSessions.h | 61 ---------------------- 11 files changed, 210 insertions(+), 151 deletions(-) create mode 100644 cpp/src/qpid/broker/SessionManager.cpp create mode 100644 cpp/src/qpid/broker/SessionManager.h delete mode 100644 cpp/src/qpid/broker/SuspendedSessions.cpp delete mode 100644 cpp/src/qpid/broker/SuspendedSessions.h (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 1ccc3564f5..acb059451a 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -29,6 +29,7 @@ #include "ExchangeRegistry.h" #include "MessageStore.h" #include "QueueRegistry.h" +#include "SessionManager.h" #include "qpid/Options.h" #include "qpid/Plugin.h" #include "qpid/Url.h" @@ -103,6 +104,8 @@ class Broker : public sys::Runnable, public Plugin::Target uint64_t getStagingThreshold() { return stagingThreshold; } DtxManager& getDtxManager() { return dtxManager; } + SessionManager& getSessionManager() { return sessionManager; } + private: sys::Acceptor& getAcceptor() const; @@ -117,6 +120,7 @@ class Broker : public sys::Runnable, public Plugin::Target ConnectionFactory factory; DtxManager dtxManager; HandlerUpdaters handlerUpdaters; + SessionManager sessionManager; static MessageStore* createStore(const Options& config); }; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index b1b8abe4fd..a21db0f603 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -94,13 +94,12 @@ void Connection::closeChannel(uint16_t id) { } SessionHandler& Connection::getChannel(ChannelId id) { - boost::optional& ch = channels[id]; - if (!ch) { - ch = boost::in_place(boost::ref(*this), id); + ChannelMap::iterator i=channels.find(id); + if (i == channels.end()) { + i = channels.insert(id, new SessionHandler(*this, id)).first; } - return *ch; + return *i; } - }} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 4f64873dc3..11f5545144 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -38,7 +38,7 @@ #include "ConnectionHandler.h" #include "SessionHandler.h" -#include +#include namespace qpid { namespace broker { @@ -82,9 +82,7 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); private: - - // Use boost::optional to allow default-constructed uninitialized entries in the map. - typedef std::map >ChannelMap; + typedef boost::ptr_map ChannelMap; typedef std::vector::iterator queue_iterator; framing::ProtocolVersion version; diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 13e5c247be..ecbffed465 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -35,7 +35,7 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) connection(c), channel(ch), proxy(out), ignoring(false) {} -SessionHandler::~SessionHandler() {} +SessionHandler::~SessionHandler() { } namespace { ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } @@ -53,7 +53,7 @@ void SessionHandler::handleIn(AMQFrame& f) { try { if (m && m->invoke(this)) return; - else if (session) + else if (session.get()) session->in(f); else if (!ignoring) throw ChannelErrorException( @@ -76,7 +76,7 @@ void SessionHandler::handleOut(AMQFrame& f) { } void SessionHandler::assertOpen(const char* method) { - if (!session) + if (!session.get()) throw ChannelErrorException( QPID_MSG(""< state( + connection.broker.getSessionManager().open(*this, detachedLifetime)); + session.reset(state.release()); getProxy().getSession().attached(session->getId(), session->getTimeout()); } diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 5ae5b5cfee..aec3731dc0 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -27,6 +27,8 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include + namespace qpid { namespace broker { @@ -37,11 +39,10 @@ class SessionState; * A SessionHandler is associated with each active channel. It * receives incoming frames, handles session commands and manages the * association between the channel and a session. - * - * SessionHandlers can be stored in a map by value. */ class SessionHandler : public framing::FrameHandler::InOutHandler, - private framing::AMQP_ServerOperations::SessionHandler + private framing::AMQP_ServerOperations::SessionHandler, + private boost::noncopyable { public: SessionHandler(Connection&, framing::ChannelId); @@ -84,8 +85,8 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, Connection& connection; const framing::ChannelId channel; framing::AMQP_ClientProxy proxy; - shared_ptr session; bool ignoring; + std::auto_ptr session; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp new file mode 100644 index 0000000000..20dd29bc31 --- /dev/null +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionManager.h" +#include "SessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/memory.h" + +#include + +#include +#include + +namespace qpid { +namespace broker { + +using namespace sys; +using namespace framing; +using std::make_pair; + +SessionManager::SessionManager() {} + +SessionManager::~SessionManager() {} + +std::auto_ptr SessionManager::open( + SessionHandler& h, uint32_t timeout_) +{ + Mutex::ScopedLock l(lock); + std::auto_ptr session(new SessionState(*this, h, timeout_)); + active.insert(session->getId()); + return session; +} + +void SessionManager::suspend(std::auto_ptr session) { + Mutex::ScopedLock l(lock); + session->expiry = AbsTime(now(),session->getTimeout()); + suspended.push_back(session.release()); // In expiry order + eraseExpired(); +} + +std::auto_ptr SessionManager::resume(const Uuid& id) { + Mutex::ScopedLock l(lock); + eraseExpired(); + if (active.find(id) != active.end()) + throw SessionBusyException( + QPID_MSG("Session already active: " << id)); + Suspended::iterator i = std::find_if( + suspended.begin(), suspended.end(), + boost::bind(std::equal_to(), id, boost::bind(&SessionState::getId, _1)) + ); + if (i == suspended.end()) + throw InvalidArgumentException( + QPID_MSG("No suspended session with id=" << id)); + active.insert(id); + return make_auto_ptr(suspended.release(i).release()); +} + +void SessionManager::eraseExpired() { + // Called with lock held. + Suspended::iterator i = std::lower_bound( + suspended.begin(), suspended.end(), now(), + boost::bind(std::less(), boost::bind(&SessionState::expiry, _1), _2)); + suspended.erase(suspended.begin(), i); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h new file mode 100644 index 0000000000..8225f04798 --- /dev/null +++ b/cpp/src/qpid/broker/SessionManager.h @@ -0,0 +1,83 @@ +#ifndef QPID_BROKER_SESSIONMANAGER_H +#define QPID_BROKER_SESSIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace qpid { +namespace broker { + +class SessionState; +class SessionHandler; + +/** + * Create and manage SessionState objects. + */ +class SessionManager : private boost::noncopyable { + public: + SessionManager(); + ~SessionManager(); + /** Open a new active session, caller takes ownership */ + std::auto_ptr open( + SessionHandler& h, uint32_t timeout_); + + /** Suspend a session, start it's timeout counter. + * The factory takes ownership. + */ + void suspend(std::auto_ptr session); + + /** Resume a suspended session. + *@throw Exception if timed out or non-existant. + */ + std::auto_ptr resume(const framing::Uuid& id); + + private: + typedef boost::ptr_vector Suspended; + typedef std::set Active; + + sys::Mutex lock; + Suspended suspended; + Active active; + + void eraseExpired(); + friend class SessionState; // removes deleted sessions from active set. +}; + + + +}} // namespace qpid::broker + + + + + +#endif /*!QPID_BROKER_SESSIONMANAGER_H*/ diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index acfb3bfea8..b56aa106a3 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -19,18 +19,20 @@ * */ #include "SessionState.h" +#include "SessionManager.h" #include "SessionHandler.h" #include "Connection.h" #include "Broker.h" #include "SemanticHandler.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace broker { using namespace framing; -SessionState::SessionState(SessionHandler& h, uint32_t timeout_) - : handler(&h), id(true), timeout(timeout_), +SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_) + : factory(f), handler(&h), id(true), timeout(timeout_), broker(h.getConnection().broker), version(h.getConnection().getVersion()) { @@ -45,6 +47,11 @@ SessionState::SessionState(SessionHandler& h, uint32_t timeout_) broker.update(handler->getChannel(), *this); } +SessionState::~SessionState() { + // Remove ID from active session list. + factory.active.erase(getId()); +} + SessionHandler& SessionState::getHandler() { assert(isAttached()); return *handler; @@ -53,9 +60,7 @@ SessionHandler& SessionState::getHandler() { AMQP_ClientProxy& SessionState::getProxy() { return getHandler().getProxy(); } - /** Convenience for: getHandler()->getConnection() - *@pre getHandler() != 0 - */ + Connection& SessionState::getConnection() { return getHandler().getConnection(); } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 1334cc7005..58944c5968 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -25,10 +25,13 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ProtocolVersion.h" +#include "qpid/sys/Time.h" #include #include +#include +#include namespace qpid { @@ -39,6 +42,7 @@ class AMQP_ClientProxy; namespace broker { class SessionHandler; +class SessionManager; class Broker; class Connection; @@ -59,9 +63,7 @@ class SessionState : public framing::FrameHandler::Chains, private boost::noncopyable { public: - /** SessionState for a newly opened connection. */ - SessionState(SessionHandler& h, uint32_t timeout_); - + ~SessionState(); bool isAttached() { return handler; } /** @pre isAttached() */ @@ -77,19 +79,21 @@ class SessionState : public framing::FrameHandler::Chains, uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } - private: - friend class SessionHandler; // Only SessionHandler can attach/detach - void detach() { handler=0; } - void attach(SessionHandler& h) { handler = &h; } + /** Only SessionManager can open sessions */ + SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_); + SessionManager& factory; SessionHandler* handler; framing::Uuid id; uint32_t timeout; + sys::AbsTime expiry; // Used by SessionManager. Broker& broker; boost::ptr_vector chain; framing::ProtocolVersion version; + + friend class SessionManager; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SuspendedSessions.cpp b/cpp/src/qpid/broker/SuspendedSessions.cpp deleted file mode 100644 index 1cd0710f1e..0000000000 --- a/cpp/src/qpid/broker/SuspendedSessions.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "SuspendedSessions.h" -#include - -namespace qpid { -namespace broker { - -using namespace framing; -using namespace sys; -using namespace boost; -typedef Mutex::ScopedLock Lock; - -void SuspendedSessions::suspend(SessionState& s) { - Lock l(lock); - assert(s.state == SessionState::ACTIVE); - if (s.timeout == 0) - s.state = SessionState::CLOSED; - else { - AbsTime expires(now(), Duration(s.timeout*TIME_SEC)); - suspended.insert(std::make_pair(expires, s)); - s.state = SessionState::SUSPENDED; - } -} - -SessionState SuspendedSessions::resume(const Uuid& id) -{ - Lock l(lock); - Map::iterator notExpired = suspended.lower_bound(now()); - suspended.erase(suspended.begin(), notExpired); - Map::iterator i = suspended.begin(); - while (i != suspended.end() && i->second.getId() != id) - ++i; - if (i == suspended.end()) - throw Exception(QPID_MSG("Session timed out or invalid ID: " << id)); - return i->second; -} - -}} // namespace qpid::broker - - - diff --git a/cpp/src/qpid/broker/SuspendedSessions.h b/cpp/src/qpid/broker/SuspendedSessions.h deleted file mode 100644 index d3a0c17050..0000000000 --- a/cpp/src/qpid/broker/SuspendedSessions.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef QPID_BROKER_SUSPENDEDSESSIONS_H -#define QPID_BROKER_SUSPENDEDSESSIONS_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/SessionState.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Mutex.h" - -#include - -namespace qpid { -namespace broker { - -/** - * Thread safe collection of suspended sessions. - * Every session is owned either by a connection's SessionHandler - * or by the SuspendedSessions. - */ -class SuspendedSessions { - typedef std::multimap Map; - - sys::Mutex lock; - Map suspended; - - public: - /** Suspend a session, start it's timeout counter.*/ - void suspend(SessionState& session); - - /** Resume a suspended session. - *@throw Exception if timed out or non-existant. - */ - SessionState resume(const framing::Uuid& id); -}; - - - -}} // namespace qpid::broker - - - -#endif /*!QPID_BROKER_SUSPENDEDSESSIONS_H*/ -- cgit v1.2.1