diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2013-06-19 14:26:03 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2013-06-19 14:26:03 +0000 |
| commit | 31e3e8bd8a6377a8cbcf578e268f43aaf8a0855a (patch) | |
| tree | bf8a85168e920fb21b3fca7657b0e3b9197a838d /qpid/cpp | |
| parent | 4322bdffcc0aa9af52f7cca3ec288f8764a6998b (diff) | |
| download | qpid-python-31e3e8bd8a6377a8cbcf578e268f43aaf8a0855a.tar.gz | |
QPID-4905: Tidy up broker::Connection
- Clean up code for accounting for sent frames
- merged Connection and ConnectionState into Connection
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1494639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
24 files changed, 208 insertions, 329 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index c579ee94d2..495d56321c 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1195,7 +1195,6 @@ set (qpidbroker_SOURCES qpid/broker/Bridge.cpp qpid/broker/Connection.cpp qpid/broker/ConnectionHandler.cpp - qpid/broker/ConnectionState.cpp qpid/broker/DeliverableMessage.cpp qpid/broker/DeliveryRecord.cpp qpid/broker/DirectExchange.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 3dc02da03a..fce256d171 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -482,7 +482,6 @@ libqpidcommon_la_SOURCES += \ qpid/sys/ConnectionInputHandler.h \ qpid/sys/ConnectionInputHandlerFactory.h \ qpid/sys/ConnectionOutputHandler.h \ - qpid/sys/ConnectionOutputHandlerPtr.h \ qpid/sys/CopyOnWriteArray.h \ qpid/sys/DeletionManager.h \ qpid/sys/DispatchHandle.cpp \ @@ -592,8 +591,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Connection.h \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionHandler.h \ - qpid/broker/ConnectionState.cpp \ - qpid/broker/ConnectionState.h \ qpid/broker/ConnectionToken.h \ qpid/broker/Consumer.h \ qpid/broker/Credit.h \ diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 75bb98d46f..0c2655f507 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -22,7 +22,6 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/FedOps.h" -#include "qpid/broker/ConnectionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" @@ -73,7 +72,7 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() : _queueName), altEx(ae), persistenceId(0), - connState(0), conn(0), initialize(init), detached(false), + conn(0), initialize(init), detached(false), useExistingQueue(!_queueName.empty()), sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag()) { @@ -104,7 +103,6 @@ Bridge::~Bridge() void Bridge::create(Connection& c) { detached = false; // Reset detached in case we are recovering. - connState = &c; conn = &c; SessionHandler& sessionHandler = c.getChannel(channel); @@ -363,7 +361,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList, qpid::framing::FieldTable* extra_args) { const string& localTag = link->getBroker()->getFederationTag(); - const string& peerTag = connState->getFederationPeerTag(); + const string& peerTag = conn->getFederationPeerTag(); if (tagList.find(peerTag) == tagList.npos) { FieldTable bindArgs; diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index da397b8f77..54a5f1600a 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -41,7 +41,6 @@ namespace qpid { namespace broker { class Connection; -class ConnectionState; class Link; class LinkRegistry; @@ -135,7 +134,6 @@ class Bridge : public PersistableConfig, std::string queueName; std::string altEx; mutable uint64_t persistenceId; - ConnectionState* connState; Connection* conn; InitializeCallback initialize; bool detached; // Set when session is detached. diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 518f599986..bcce6e6ca4 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -22,7 +22,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/AclModule.h" -#include "qpid/broker/ConnectionState.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" @@ -77,7 +77,6 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Time.h" #include "qpid/sys/Timer.h" -#include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/SystemInfo.h" #include "qpid/Address.h" @@ -708,7 +707,7 @@ struct InvalidParameter : public qpid::Exception }; void Broker::createObject(const std::string& type, const std::string& name, - const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) + const Variant::Map& properties, bool /*strict*/, const Connection* context) { std::string userId; std::string connectionId; @@ -899,7 +898,7 @@ void Broker::createObject(const std::string& type, const std::string& name, } void Broker::deleteObject(const std::string& type, const std::string& name, - const Variant::Map& options, const ConnectionState* context) + const Variant::Map& options, const Connection* context) { std::string userId; std::string connectionId; @@ -953,7 +952,7 @@ void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmp Manageable::status_t Broker::queryObject(const std::string& type, const std::string& name, Variant::Map& results, - const ConnectionState* context) + const Connection* context) { std::string userId; std::string connectionId; @@ -995,7 +994,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name, } Manageable::status_t Broker::getTimestampConfig(bool& receive, - const ConnectionState* context) + const Connection* context) { std::string name; // none needed for broker std::string userId = context->getUserId(); @@ -1007,7 +1006,7 @@ Manageable::status_t Broker::getTimestampConfig(bool& receive, } Manageable::status_t Broker::setTimestampConfig(const bool receive, - const ConnectionState* context) + const Connection* context) { std::string name; // none needed for broker std::string userId = context->getUserId(); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 44b09239c4..11deb6f43a 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -63,7 +63,7 @@ struct Url; namespace broker { class AclModule; -class ConnectionState; +class Connection; class ExpiryPolicy; class Message; struct QueueSettings; @@ -151,20 +151,20 @@ class Broker : public sys::Runnable, public Plugin::Target, void setLogHiresTimestamp(bool enabled); bool getLogHiresTimestamp(); void createObject(const std::string& type, const std::string& name, - const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context); + const qpid::types::Variant::Map& properties, bool strict, const Connection* context); void deleteObject(const std::string& type, const std::string& name, - const qpid::types::Variant::Map& options, const ConnectionState* context); + const qpid::types::Variant::Map& options, const Connection* context); void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty); Manageable::status_t queryObject(const std::string& type, const std::string& name, - qpid::types::Variant::Map& results, const ConnectionState* context); + qpid::types::Variant::Map& results, const Connection* context); Manageable::status_t queryQueue( const std::string& name, const std::string& userId, const std::string& connectionId, qpid::types::Variant::Map& results); Manageable::status_t getTimestampConfig(bool& receive, - const ConnectionState* context); + const Connection* context); Manageable::status_t setTimestampConfig(const bool receive, - const ConnectionState* context); + const Connection* context); Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue); void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs); boost::shared_ptr<sys::Poller> poller; diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 6bf7282a95..a127f9bee2 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/Connection.h" + #include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/SessionState.h" @@ -26,6 +27,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/SecuritySettings.h" #include "qpid/sys/Timer.h" @@ -80,6 +82,47 @@ struct ConnectionTimeoutTask : public sys::TimerTask { connection.abort(); } }; +/** + * A ConnectionOutputHandler that delegates to another + * ConnectionOutputHandler. Allows you to inspect outputting frames + */ +class FrameInspector : public sys::ConnectionOutputHandler +{ +public: + FrameInspector(ConnectionOutputHandler* p, framing::FrameHandler* i) : + next(p), + intercepter(i) + { + assert(next); + assert(intercepter); + } + + void close() { next->close(); } + void abort() { next->abort(); } + void connectionEstablished() { next->connectionEstablished(); } + void activateOutput() { next->activateOutput(); } + void handle(framing::AMQFrame& f) { intercepter->handle(f); next->handle(f); } + +private: + ConnectionOutputHandler* next; + framing::FrameHandler* intercepter; +}; + +/** + * Chained ConnectionOutputHandler that allows outgoing frames to be + * tracked (for updating mgmt stats). + */ +class OutboundFrameTracker : public framing::FrameHandler +{ +public: + OutboundFrameTracker(Connection& _con) : con(_con) {} + void handle(framing::AMQFrame& f) + { + con.sent(f); + } +private: + Connection& con; +}; Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const @@ -88,19 +131,24 @@ Connection::Connection(ConnectionOutputHandler* out_, bool link_, uint64_t objectId_ ) : - ConnectionState(out_, broker_), + outboundTracker(new OutboundFrameTracker(*this)), + out(new FrameInspector(out_, outboundTracker.get())), + broker(broker_), + framemax(65535), + heartbeat(0), + heartbeatmax(120), + userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links) + isDefaultRealm(false), securitySettings(external), - adapter(*this, link_), link(link_), + adapter(*this, link), mgmtClosing(false), mgmtId(mgmtId_), links(broker_.getLinks()), agent(0), timer(broker_.getTimer()), - objectId(objectId_), - outboundTracker(*this) + objectId(objectId_) { - outboundTracker.wrap(out); broker.getConnectionObservers().connection(*this); assert(agent == 0); assert(mgmtObject == 0); @@ -112,7 +160,7 @@ Connection::Connection(ConnectionOutputHandler* out_, mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10")); agent->addObject(mgmtObject, objectId); } - ConnectionState::setUrl(mgmtId); + setUrl(mgmtId); } } @@ -120,15 +168,15 @@ void Connection::requestIOProcessing(boost::function0<void> callback) { ScopedLock<Mutex> l(ioCallbackLock); ioCallbacks.push(callback); - if (isOpen()) out.activateOutput(); + if (isOpen()) out->activateOutput(); } Connection::~Connection() { if (mgmtObject != 0) { if (!link) - agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties())); - QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId() + agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, getUserId(), mgmtObject->get_remoteProperties())); + QPID_LOG_CAT(debug, model, "Delete connection. user:" << getUserId() << " rhost:" << mgmtId ); mgmtObject->resourceDestroy(); } @@ -260,14 +308,19 @@ void Connection::notifyConnectionForced(const string& text) broker.getConnectionObservers().forced(*this, text); } -void Connection::setUserId(const string& userId) +void Connection::setUserId(const string& uid) { - ConnectionState::setUserId(userId); + userId = uid; + size_t at = userId.find('@'); + userName = userId.substr(0, at); + isDefaultRealm = ( + at!= std::string::npos && + getBroker().getOptions().realm == userId.substr(at+1,userId.size())); } void Connection::setUserProxyAuth(bool b) { - ConnectionState::setUserProxyAuth(b); + userProxyAuth = b; if (mgmtObject != 0) mgmtObject->set_userProxyAuth(b); } @@ -286,7 +339,22 @@ void Connection::close(connection::CloseCode code, const string& text) //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); channels.clear(); - getOutput().close(); + out->close(); +} + +void Connection::activateOutput() +{ + out->activateOutput(); +} + +void Connection::addOutputTask(OutputTask* t) +{ + outputTasks.addOutputTask(t); +} + +void Connection::removeOutputTask(OutputTask* t) +{ + outputTasks.removeOutputTask(t); } void Connection::closed(){ // Physically closed, suspend open sessions. @@ -371,7 +439,7 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, stri case _qmf::Connection::METHOD_CLOSE : mgmtClosing = true; if (mgmtObject != 0) mgmtObject->set_closing(1); - out.activateOutput(); + out->activateOutput(); status = Manageable::STATUS_OK; break; } @@ -435,7 +503,7 @@ void Connection::abort() if (heartbeatTimer) heartbeatTimer->cancel(); - out.abort(); + out->abort(); } void Connection::setHeartbeatInterval(uint16_t heartbeat) @@ -451,7 +519,7 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat) timer.add(timeoutTimer); } } - out.connectionEstablished(); + out->connectionEstablished(); } void Connection::startLinkHeartbeatTimeoutTask() { @@ -459,7 +527,7 @@ void Connection::startLinkHeartbeatTimeoutTask() { linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this); timer.add(linkHeartbeatTimer); } - out.connectionEstablished(); + out->connectionEstablished(); } void Connection::restartTimeout() @@ -474,20 +542,4 @@ void Connection::restartTimeout() bool Connection::isOpen() { return adapter.isOpen(); } -Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {} -void Connection::OutboundFrameTracker::close() { next->close(); } -void Connection::OutboundFrameTracker::abort() { next->abort(); } -void Connection::OutboundFrameTracker::connectionEstablished() { next->connectionEstablished(); } -void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } -void Connection::OutboundFrameTracker::handle(framing::AMQFrame& f) -{ - next->handle(f); - con.sent(f); -} -void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) -{ - next = p.get(); - p.set(this); -} - }} diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index a5446a72d8..d2bc22cbe9 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -28,23 +28,29 @@ #include <queue> #include "qpid/broker/BrokerImportExport.h" + #include "qpid/broker/ConnectionHandler.h" -#include "qpid/broker/ConnectionState.h" +#include "qpid/broker/ConnectionToken.h" +#include "qpid/management/Manageable.h" +#include "qpid/sys/AggregateOutput.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/SecuritySettings.h" #include "qpid/sys/Mutex.h" #include "qpid/RefCounted.h" +#include "qpid/Url.h" #include "qpid/ptr_map.h" #include "qmf/org/apache/qpid/broker/Connection.h" #include <boost/ptr_container/ptr_map.hpp> +#include <boost/scoped_ptr.hpp> #include <boost/bind.hpp> #include <algorithm> namespace qpid { namespace sys { +class ConnectionOutputHandler; class Timer; class TimerTask; } @@ -58,10 +64,45 @@ class SessionHandler; struct ConnectionTimeoutTask; class Connection : public sys::ConnectionInputHandler, - public ConnectionState, + public ConnectionToken, public management::Manageable, public RefCounted { public: + uint32_t getFrameMax() const { return framemax; } + uint16_t getHeartbeat() const { return heartbeat; } + uint16_t getHeartbeatMax() const { return heartbeatmax; } + + void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); } + void setHeartbeat(uint16_t hb) { heartbeat = hb; } + void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } + + const std::string& getUserId() const { return userId; } + + void setUrl(const std::string& _url) { url = _url; } + const std::string& getUrl() const { return url; } + + void setUserProxyAuth(const bool b); + bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids + bool isFederationLink() const { return federationPeerTag.size() > 0; } + void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); } + const std::string& getFederationPeerTag() const { return federationPeerTag; } + std::vector<Url>& getKnownHosts() { return knownHosts; } + + /**@return true if user is the authenticated user on this connection. + * If id has the default realm will also compare plain username. + */ + bool isAuthenticatedUser(const std::string& id) const { + return (id == userId || (isDefaultRealm && id == userName)); + } + + Broker& getBroker() { return broker; } + + sys::ConnectionOutputHandler& getOutput() { return *out; } + void activateOutput(); + void addOutputTask(OutputTask*); + void removeOutputTask(OutputTask*); + framing::ProtocolVersion getVersion() const { return version; } + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, @@ -111,10 +152,8 @@ class Connection : public sys::ConnectionInputHandler, void setUserId(const std::string& uid); // credentials for connected client - const std::string& getUserId() const { return ConnectionState::getUserId(); } const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } - void setUserProxyAuth(bool b); void setHeartbeatInterval(uint16_t heartbeat); void sendHeartbeat(); @@ -137,18 +176,39 @@ class Connection : public sys::ConnectionInputHandler, const framing::FieldTable& getClientProperties() const { return clientProperties; } private: + // Management object is used in the constructor so must be early + qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject; + + //contained output tasks + sys::AggregateOutput outputTasks; + + boost::scoped_ptr<framing::FrameHandler> outboundTracker; + boost::scoped_ptr<sys::ConnectionOutputHandler> out; + + Broker& broker; + + framing::ProtocolVersion version; + uint32_t framemax; + uint16_t heartbeat; + uint16_t heartbeatmax; + std::string userId; + std::string url; + bool userProxyAuth; + std::string federationPeerTag; + std::vector<Url> knownHosts; + std::string userName; + bool isDefaultRealm; + typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; - typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator; ChannelMap channels; qpid::sys::SecuritySettings securitySettings; - ConnectionHandler adapter; const bool link; + ConnectionHandler adapter; bool mgmtClosing; const std::string mgmtId; sys::Mutex ioCallbackLock; std::queue<boost::function0<void> > ioCallbacks; - qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; sys::Timer& timer; @@ -157,25 +217,7 @@ class Connection : public sys::ConnectionInputHandler, uint64_t objectId; framing::FieldTable clientProperties; - /** - * Chained ConnectionOutputHandler that allows outgoing frames to be - * tracked (for updating mgmt stats). - */ - class OutboundFrameTracker : public sys::ConnectionOutputHandler - { - public: - OutboundFrameTracker(Connection&); - void close(); - void abort(); - void connectionEstablished(); - void activateOutput(); - void handle(framing::AMQFrame&); - void wrap(sys::ConnectionOutputHandlerPtr&); - private: - Connection& con; - sys::ConnectionOutputHandler* next; - }; - OutboundFrameTracker outboundTracker; +friend class OutboundFrameTracker; void sent(const framing::AMQFrame& f); void doIoCallbacks(); diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index e86dcdb086..40393f1920 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -33,6 +33,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/SecurityLayer.h" #include "qpid/sys/Time.h" #include "qpid/broker/AclModule.h" @@ -254,7 +255,7 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, void ConnectionHandler::Handler::open(const string& /*virtualHost*/, const framing::Array& /*capabilities*/, bool /*insist*/) { - std::vector<Url> urls = connection.broker.getKnownBrokers(); + std::vector<Url> urls = connection.getBroker().getKnownBrokers(); framing::Array array(0x95); // str16 array for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.cpp b/qpid/cpp/src/qpid/broker/ConnectionState.cpp deleted file mode 100644 index c6a8317c2b..0000000000 --- a/qpid/cpp/src/qpid/broker/ConnectionState.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/ConnectionState.h" - -#include "qpid/broker/Broker.h" - -namespace qpid { -namespace broker { - -void ConnectionState::setUserId(const std::string& uid) { - userId = uid; - size_t at = userId.find('@'); - userName = userId.substr(0, at); - isDefaultRealm = ( - at!= std::string::npos && - getBroker().getOptions().realm == userId.substr(at+1,userId.size())); -} - -}} diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h deleted file mode 100644 index 88ec3af62e..0000000000 --- a/qpid/cpp/src/qpid/broker/ConnectionState.h +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ConnectionState_ -#define _ConnectionState_ - -#include "qpid/broker/ConnectionToken.h" -#include "qpid/sys/AggregateOutput.h" -#include "qpid/sys/ConnectionOutputHandlerPtr.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/management/Manageable.h" -#include "qpid/Url.h" - -#include <boost/function.hpp> -#include <vector> - - -namespace qpid { -namespace broker { - -class Broker; - -class ConnectionState : public ConnectionToken, public management::Manageable -{ - protected: - sys::ConnectionOutputHandlerPtr out; - - public: - ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : - out(o), - broker(b), - framemax(65535), - heartbeat(0), - heartbeatmax(120), - userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links) - isDefaultRealm(false) - {} - - virtual ~ConnectionState () {} - - uint32_t getFrameMax() const { return framemax; } - uint16_t getHeartbeat() const { return heartbeat; } - uint16_t getHeartbeatMax() const { return heartbeatmax; } - - void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); } - void setHeartbeat(uint16_t hb) { heartbeat = hb; } - void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } - - virtual void setUserId(const std::string& uid); - - const std::string& getUserId() const { return userId; } - - void setUrl(const std::string& _url) { url = _url; } - const std::string& getUrl() const { return url; } - - void setUserProxyAuth(const bool b) { userProxyAuth = b; } - bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids - bool isFederationLink() const { return federationPeerTag.size() > 0; } - void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); } - const std::string& getFederationPeerTag() const { return federationPeerTag; } - std::vector<Url>& getKnownHosts() { return knownHosts; } - - /**@return true if user is the authenticated user on this connection. - * If id has the default realm will also compare plain username. - */ - bool isAuthenticatedUser(const std::string& id) const { - return (id == userId || (isDefaultRealm && id == userName)); - } - - Broker& getBroker() { return broker; } - - Broker& broker; - - //contained output tasks - sys::AggregateOutput outputTasks; - - sys::ConnectionOutputHandler& getOutput() { return out; } - framing::ProtocolVersion getVersion() const { return version; } - - virtual void requestIOProcessing (boost::function0<void>) = 0; - - protected: - framing::ProtocolVersion version; - uint32_t framemax; - uint16_t heartbeat; - uint16_t heartbeatmax; - std::string userId; - std::string url; - bool userProxyAuth; - std::string federationPeerTag; - std::vector<Url> knownHosts; - std::string userName; - bool isDefaultRealm; -}; - -}} - -#endif diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h index aae636e818..72bfb1c474 100644 --- a/qpid/cpp/src/qpid/broker/HandlerImpl.h +++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h @@ -21,7 +21,7 @@ #include "qpid/broker/SemanticState.h" #include "qpid/broker/SessionContext.h" -#include "qpid/broker/ConnectionState.h" +#include "qpid/broker/Connection.h" namespace qpid { namespace broker { @@ -40,7 +40,7 @@ class HandlerImpl { HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } - ConnectionState& getConnection() { return session.getConnection(); } + Connection& getConnection() { return session.getConnection(); } Broker& getBroker() { return session.getConnection().getBroker(); } }; diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 1d8b117b84..a5ef8c560c 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -22,9 +22,10 @@ #include "qpid/broker/AclModule.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" -#include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldValue.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/SecuritySettings.h" #include <boost/format.hpp> diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index a42ed883a9..54069df591 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -42,6 +42,7 @@ #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" #include "qpid/broker/FedOps.h" +#include "qpid/sys/ConnectionOutputHandler.h" #include <boost/bind.hpp> #include <boost/format.hpp> @@ -439,7 +440,7 @@ void SemanticState::disable(ConsumerImpl::shared_ptr c) { c->disableNotify(); if (session.isAttached()) - session.getConnection().outputTasks.removeOutputTask(c.get()); + session.getConnection().removeOutputTask(c.get()); } void SemanticState::cancel(ConsumerImpl::shared_ptr c) @@ -505,8 +506,8 @@ void SemanticState::requestDispatch() void SemanticStateConsumerImpl::requestDispatch() { if (blocked) { - parent->session.getConnection().outputTasks.addOutputTask(this); - parent->session.getConnection().getOutput().activateOutput(); + parent->session.getConnection().addOutputTask(this); + parent->session.getConnection().activateOutput(); blocked = false; } } @@ -735,8 +736,8 @@ void SemanticStateConsumerImpl::notify() { Mutex::ScopedLock l(lock); if (notifyEnabled) { - parent->session.getConnection().outputTasks.addOutputTask(this); - parent->session.getConnection().getOutput().activateOutput(); + parent->session.getConnection().addOutputTask(this); + parent->session.getConnection().activateOutput(); } } @@ -804,16 +805,16 @@ void SemanticState::attached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); - session.getConnection().outputTasks.addOutputTask(i->second.get()); + session.getConnection().addOutputTask(i->second.get()); } - session.getConnection().getOutput().activateOutput(); + session.getConnection().activateOutput(); } void SemanticState::detached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); - session.getConnection().outputTasks.removeOutputTask(i->second.get()); + session.getConnection().removeOutputTask(i->second.get()); } } diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index 8134eba041..316301481f 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -36,7 +36,7 @@ class AMQP_ClientProxy; namespace broker { class Broker; -class ConnectionState; +class Connection; class SessionContext : public OwnershipToken { @@ -44,7 +44,7 @@ class SessionContext : public OwnershipToken virtual ~SessionContext(){} virtual bool isLocal(const ConnectionToken* t) const = 0; virtual bool isAttached() const = 0; - virtual ConnectionState& getConnection() = 0; + virtual Connection& getConnection() = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 28827ccb9e..8cbecbc6f7 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/SessionState.h" #include "qpid/log/Statement.h" +#include "qpid/sys/ConnectionOutputHandler.h" #include <boost/bind.hpp> @@ -63,9 +64,9 @@ void SessionHandler::executionException( errorListener->executionException(code, msg); } -ConnectionState& SessionHandler::getConnection() { return connection; } +Connection& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } +const Connection& SessionHandler::getConnection() const { return connection; } void SessionHandler::handleDetach() { qpid::amqp_0_10::SessionHandler::handleDetach(); @@ -80,7 +81,7 @@ void SessionHandler::handleDetach() { void SessionHandler::setState(const std::string& name, bool force) { assert(!session.get()); SessionId id(connection.getUserId(), name); - session = connection.broker.getSessionManager().attach(*this, id, force); + session = connection.getBroker().getSessionManager().attach(*this, id, force); } void SessionHandler::detaching() @@ -102,7 +103,7 @@ void SessionHandler::readyToSend() { void SessionHandler::attachAs(const std::string& name) { SessionId id(connection.getUserId(), name); - SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); + SessionState::Configuration config = connection.getBroker().getSessionManager().getSessionConfig(); session.reset(new SessionState(connection.getBroker(), *this, id, config)); sendAttach(false); } @@ -118,7 +119,7 @@ void SessionHandler::attached(const std::string& name) qpid::amqp_0_10::SessionHandler::attached(name); } else { SessionId id(connection.getUserId(), name); - SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); + SessionState::Configuration config = connection.getBroker().getSessionManager().getSessionConfig(); session.reset(new SessionState(connection.getBroker(), *this, id, config)); markReadyToSend(); } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index d42b7838bb..da57fb103e 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -33,7 +33,6 @@ class SessionState; namespace broker { class Connection; -class ConnectionState; class SessionState; /** @@ -65,8 +64,8 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler { SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - ConnectionState& getConnection(); - const ConnectionState& getConnection() const; + Connection& getConnection(); + const Connection& getConnection() const; framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index a89e5ef974..ccf77413df 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -20,7 +20,6 @@ */ #include "qpid/broker/SessionState.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/ConnectionState.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SessionManager.h" @@ -97,7 +96,7 @@ uint16_t SessionState::getChannel() const { return handler->getChannel(); } -ConnectionState& SessionState::getConnection() { +Connection& SessionState::getConnection() { assert(isAttached()); return handler->getConnection(); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 269bba9f6e..df6ba3b17f 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -89,7 +89,7 @@ class SessionState : public qpid::SessionState, uint16_t getChannel() const; /** @pre isAttached() */ - ConnectionState& getConnection(); + Connection& getConnection(); bool isLocal(const ConnectionToken* t) const; Broker& getBroker(); diff --git a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp index c04d037a6e..9379d20b06 100644 --- a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp @@ -27,6 +27,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldValue.h" +#include "qpid/sys/ConnectionOutputHandler.h" #include <windows.h> diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 7b153f90ca..f36da6c1e1 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -31,7 +31,6 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueueObserver.h" #include "qpid/broker/SessionContext.h" -#include "qpid/broker/ConnectionState.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 86e9d0be8d..9adc59b63d 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -37,7 +37,7 @@ #include "qpid/sys/Timer.h" #include "qpid/sys/Thread.h" #include "qpid/sys/PollableQueue.h" -#include "qpid/broker/ConnectionState.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/AclModule.h" #include "qpid/types/Variant.h" #include "qpid/types/Uuid.h" @@ -88,7 +88,7 @@ const string keyifyNameStr(const string& name) struct ScopedManagementContext { - ScopedManagementContext(const qpid::broker::ConnectionState* context) + ScopedManagementContext(const qpid::broker::Connection* context) { setManagementExecutionContext(context); } @@ -1286,7 +1286,7 @@ void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyT return; } - string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + string userId = ((const qpid::broker::Connection*) connToken)->getUserId(); if (acl != 0) { map<acl::Property, string> params; params[acl::PROP_SCHEMAPACKAGE] = packageName; @@ -1407,7 +1407,7 @@ void ManagementAgent::handleMethodRequest (const string& body, const string& rte return; } - string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + string userId = ((const qpid::broker::Connection*) connToken)->getUserId(); if (acl != 0) { map<acl::Property, string> params; params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName(); @@ -1723,7 +1723,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply string label; uint32_t requestedBrokerBank, requestedAgentBank; uint32_t assignedBank; - ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + ObjectId connectionRef = ((const Connection*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; moveNewObjects(); @@ -2206,7 +2206,7 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg) if (acl == 0) return true; - string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); + string userId = ((const qpid::broker::Connection*) msg.getPublisher())->getUserId(); params[acl::PROP_SCHEMAPACKAGE] = packageName; params[acl::PROP_SCHEMACLASS] = className; @@ -2276,7 +2276,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); - ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher()); + ScopedManagementContext context((const qpid::broker::Connection*) msg.getPublisher()); const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; if (headers && p->getAppId() == "qmf2") { @@ -2755,14 +2755,14 @@ ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents( } namespace { -QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; +QPID_TSS const qpid::broker::Connection* executionContext = 0; } -void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt) +void setManagementExecutionContext(const qpid::broker::Connection* ctxt) { executionContext = ctxt; } -const qpid::broker::ConnectionState* getManagementExecutionContext() +const qpid::broker::Connection* getManagementExecutionContext() { return executionContext; } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 6de5d1d719..2de2a232de 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -44,7 +44,7 @@ namespace qpid { namespace broker { -class ConnectionState; +class Connection; } namespace sys { class Timer; @@ -379,8 +379,8 @@ private: std::auto_ptr<EventQueue> sendQueue; }; -void setManagementExecutionContext(const qpid::broker::ConnectionState*); -const qpid::broker::ConnectionState* getManagementExecutionContext(); +void setManagementExecutionContext(const qpid::broker::Connection*); +const qpid::broker::Connection* getManagementExecutionContext(); }} #endif /*!_ManagementAgent_*/ diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h deleted file mode 100644 index 053f47df49..0000000000 --- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H -#define QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/ConnectionOutputHandler.h" - -namespace qpid { -namespace sys { - -/** - * A ConnectionOutputHandler that delegates to another - * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler - * to be changed without updating all the pointers/references - * using the ConnectionOutputHandlerPtr - */ -class ConnectionOutputHandlerPtr : public ConnectionOutputHandler -{ - public: - ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) { assert(next); } - void set(ConnectionOutputHandler* p) { next = p; assert(next); } - ConnectionOutputHandler* get() { return next; } - const ConnectionOutputHandler* get() const { return next; } - - void close() { next->close(); } - void abort() { next->abort(); } - void connectionEstablished() { next->connectionEstablished(); } - void activateOutput() { next->activateOutput(); } - void handle(framing::AMQFrame& f) { next->handle(f); } - - private: - ConnectionOutputHandler* next; -}; -}} // namespace qpid::sys - -#endif /*!QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H*/ |
