summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-06-19 14:26:03 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-06-19 14:26:03 +0000
commit31e3e8bd8a6377a8cbcf578e268f43aaf8a0855a (patch)
treebf8a85168e920fb21b3fca7657b0e3b9197a838d /qpid/cpp
parent4322bdffcc0aa9af52f7cca3ec288f8764a6998b (diff)
downloadqpid-python-31e3e8bd8a6377a8cbcf578e268f43aaf8a0855a.tar.gz
QPID-4905: Tidy up broker::Connection
- Clean up code for accounting for sent frames - merged Connection and ConnectionState into Connection git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1494639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h12
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp118
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h94
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h115
-rw-r--r--qpid/cpp/src/qpid/broker/HandlerImpl.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h5
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp20
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h6
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h55
24 files changed, 208 insertions, 329 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index c579ee94d2..495d56321c 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1195,7 +1195,6 @@ set (qpidbroker_SOURCES
qpid/broker/Bridge.cpp
qpid/broker/Connection.cpp
qpid/broker/ConnectionHandler.cpp
- qpid/broker/ConnectionState.cpp
qpid/broker/DeliverableMessage.cpp
qpid/broker/DeliveryRecord.cpp
qpid/broker/DirectExchange.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 3dc02da03a..fce256d171 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -482,7 +482,6 @@ libqpidcommon_la_SOURCES += \
qpid/sys/ConnectionInputHandler.h \
qpid/sys/ConnectionInputHandlerFactory.h \
qpid/sys/ConnectionOutputHandler.h \
- qpid/sys/ConnectionOutputHandlerPtr.h \
qpid/sys/CopyOnWriteArray.h \
qpid/sys/DeletionManager.h \
qpid/sys/DispatchHandle.cpp \
@@ -592,8 +591,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Connection.h \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionHandler.h \
- qpid/broker/ConnectionState.cpp \
- qpid/broker/ConnectionState.h \
qpid/broker/ConnectionToken.h \
qpid/broker/Consumer.h \
qpid/broker/Credit.h \
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 75bb98d46f..0c2655f507 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -22,7 +22,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/FedOps.h"
-#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
@@ -73,7 +72,7 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
: _queueName),
altEx(ae), persistenceId(0),
- connState(0), conn(0), initialize(init), detached(false),
+ conn(0), initialize(init), detached(false),
useExistingQueue(!_queueName.empty()),
sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
{
@@ -104,7 +103,6 @@ Bridge::~Bridge()
void Bridge::create(Connection& c)
{
detached = false; // Reset detached in case we are recovering.
- connState = &c;
conn = &c;
SessionHandler& sessionHandler = c.getChannel(channel);
@@ -363,7 +361,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList,
qpid::framing::FieldTable* extra_args)
{
const string& localTag = link->getBroker()->getFederationTag();
- const string& peerTag = connState->getFederationPeerTag();
+ const string& peerTag = conn->getFederationPeerTag();
if (tagList.find(peerTag) == tagList.npos) {
FieldTable bindArgs;
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index da397b8f77..54a5f1600a 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -41,7 +41,6 @@ namespace qpid {
namespace broker {
class Connection;
-class ConnectionState;
class Link;
class LinkRegistry;
@@ -135,7 +134,6 @@ class Bridge : public PersistableConfig,
std::string queueName;
std::string altEx;
mutable uint64_t persistenceId;
- ConnectionState* connState;
Connection* conn;
InitializeCallback initialize;
bool detached; // Set when session is detached.
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 518f599986..bcce6e6ca4 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/AclModule.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
@@ -77,7 +77,6 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Timer.h"
-#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/Address.h"
@@ -708,7 +707,7 @@ struct InvalidParameter : public qpid::Exception
};
void Broker::createObject(const std::string& type, const std::string& name,
- const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
+ const Variant::Map& properties, bool /*strict*/, const Connection* context)
{
std::string userId;
std::string connectionId;
@@ -899,7 +898,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
}
void Broker::deleteObject(const std::string& type, const std::string& name,
- const Variant::Map& options, const ConnectionState* context)
+ const Variant::Map& options, const Connection* context)
{
std::string userId;
std::string connectionId;
@@ -953,7 +952,7 @@ void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmp
Manageable::status_t Broker::queryObject(const std::string& type,
const std::string& name,
Variant::Map& results,
- const ConnectionState* context)
+ const Connection* context)
{
std::string userId;
std::string connectionId;
@@ -995,7 +994,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
- const ConnectionState* context)
+ const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
@@ -1007,7 +1006,7 @@ Manageable::status_t Broker::getTimestampConfig(bool& receive,
}
Manageable::status_t Broker::setTimestampConfig(const bool receive,
- const ConnectionState* context)
+ const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 44b09239c4..11deb6f43a 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -63,7 +63,7 @@ struct Url;
namespace broker {
class AclModule;
-class ConnectionState;
+class Connection;
class ExpiryPolicy;
class Message;
struct QueueSettings;
@@ -151,20 +151,20 @@ class Broker : public sys::Runnable, public Plugin::Target,
void setLogHiresTimestamp(bool enabled);
bool getLogHiresTimestamp();
void createObject(const std::string& type, const std::string& name,
- const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
+ const qpid::types::Variant::Map& properties, bool strict, const Connection* context);
void deleteObject(const std::string& type, const std::string& name,
- const qpid::types::Variant::Map& options, const ConnectionState* context);
+ const qpid::types::Variant::Map& options, const Connection* context);
void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
Manageable::status_t queryObject(const std::string& type, const std::string& name,
- qpid::types::Variant::Map& results, const ConnectionState* context);
+ qpid::types::Variant::Map& results, const Connection* context);
Manageable::status_t queryQueue( const std::string& name,
const std::string& userId,
const std::string& connectionId,
qpid::types::Variant::Map& results);
Manageable::status_t getTimestampConfig(bool& receive,
- const ConnectionState* context);
+ const Connection* context);
Manageable::status_t setTimestampConfig(const bool receive,
- const ConnectionState* context);
+ const Connection* context);
Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue);
void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs);
boost::shared_ptr<sys::Poller> poller;
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 6bf7282a95..a127f9bee2 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Connection.h"
+
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/SessionState.h"
@@ -26,6 +27,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/Timer.h"
@@ -80,6 +82,47 @@ struct ConnectionTimeoutTask : public sys::TimerTask {
connection.abort();
}
};
+/**
+ * A ConnectionOutputHandler that delegates to another
+ * ConnectionOutputHandler. Allows you to inspect outputting frames
+ */
+class FrameInspector : public sys::ConnectionOutputHandler
+{
+public:
+ FrameInspector(ConnectionOutputHandler* p, framing::FrameHandler* i) :
+ next(p),
+ intercepter(i)
+ {
+ assert(next);
+ assert(intercepter);
+ }
+
+ void close() { next->close(); }
+ void abort() { next->abort(); }
+ void connectionEstablished() { next->connectionEstablished(); }
+ void activateOutput() { next->activateOutput(); }
+ void handle(framing::AMQFrame& f) { intercepter->handle(f); next->handle(f); }
+
+private:
+ ConnectionOutputHandler* next;
+ framing::FrameHandler* intercepter;
+};
+
+/**
+ * Chained ConnectionOutputHandler that allows outgoing frames to be
+ * tracked (for updating mgmt stats).
+ */
+class OutboundFrameTracker : public framing::FrameHandler
+{
+public:
+ OutboundFrameTracker(Connection& _con) : con(_con) {}
+ void handle(framing::AMQFrame& f)
+ {
+ con.sent(f);
+ }
+private:
+ Connection& con;
+};
Connection::Connection(ConnectionOutputHandler* out_,
Broker& broker_, const
@@ -88,19 +131,24 @@ Connection::Connection(ConnectionOutputHandler* out_,
bool link_,
uint64_t objectId_
) :
- ConnectionState(out_, broker_),
+ outboundTracker(new OutboundFrameTracker(*this)),
+ out(new FrameInspector(out_, outboundTracker.get())),
+ broker(broker_),
+ framemax(65535),
+ heartbeat(0),
+ heartbeatmax(120),
+ userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links)
+ isDefaultRealm(false),
securitySettings(external),
- adapter(*this, link_),
link(link_),
+ adapter(*this, link),
mgmtClosing(false),
mgmtId(mgmtId_),
links(broker_.getLinks()),
agent(0),
timer(broker_.getTimer()),
- objectId(objectId_),
- outboundTracker(*this)
+ objectId(objectId_)
{
- outboundTracker.wrap(out);
broker.getConnectionObservers().connection(*this);
assert(agent == 0);
assert(mgmtObject == 0);
@@ -112,7 +160,7 @@ Connection::Connection(ConnectionOutputHandler* out_,
mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
agent->addObject(mgmtObject, objectId);
}
- ConnectionState::setUrl(mgmtId);
+ setUrl(mgmtId);
}
}
@@ -120,15 +168,15 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
{
ScopedLock<Mutex> l(ioCallbackLock);
ioCallbacks.push(callback);
- if (isOpen()) out.activateOutput();
+ if (isOpen()) out->activateOutput();
}
Connection::~Connection()
{
if (mgmtObject != 0) {
if (!link)
- agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties()));
- QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
+ agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, getUserId(), mgmtObject->get_remoteProperties()));
+ QPID_LOG_CAT(debug, model, "Delete connection. user:" << getUserId()
<< " rhost:" << mgmtId );
mgmtObject->resourceDestroy();
}
@@ -260,14 +308,19 @@ void Connection::notifyConnectionForced(const string& text)
broker.getConnectionObservers().forced(*this, text);
}
-void Connection::setUserId(const string& userId)
+void Connection::setUserId(const string& uid)
{
- ConnectionState::setUserId(userId);
+ userId = uid;
+ size_t at = userId.find('@');
+ userName = userId.substr(0, at);
+ isDefaultRealm = (
+ at!= std::string::npos &&
+ getBroker().getOptions().realm == userId.substr(at+1,userId.size()));
}
void Connection::setUserProxyAuth(bool b)
{
- ConnectionState::setUserProxyAuth(b);
+ userProxyAuth = b;
if (mgmtObject != 0)
mgmtObject->set_userProxyAuth(b);
}
@@ -286,7 +339,22 @@ void Connection::close(connection::CloseCode code, const string& text)
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
channels.clear();
- getOutput().close();
+ out->close();
+}
+
+void Connection::activateOutput()
+{
+ out->activateOutput();
+}
+
+void Connection::addOutputTask(OutputTask* t)
+{
+ outputTasks.addOutputTask(t);
+}
+
+void Connection::removeOutputTask(OutputTask* t)
+{
+ outputTasks.removeOutputTask(t);
}
void Connection::closed(){ // Physically closed, suspend open sessions.
@@ -371,7 +439,7 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, stri
case _qmf::Connection::METHOD_CLOSE :
mgmtClosing = true;
if (mgmtObject != 0) mgmtObject->set_closing(1);
- out.activateOutput();
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
}
@@ -435,7 +503,7 @@ void Connection::abort()
if (heartbeatTimer)
heartbeatTimer->cancel();
- out.abort();
+ out->abort();
}
void Connection::setHeartbeatInterval(uint16_t heartbeat)
@@ -451,7 +519,7 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat)
timer.add(timeoutTimer);
}
}
- out.connectionEstablished();
+ out->connectionEstablished();
}
void Connection::startLinkHeartbeatTimeoutTask() {
@@ -459,7 +527,7 @@ void Connection::startLinkHeartbeatTimeoutTask() {
linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
timer.add(linkHeartbeatTimer);
}
- out.connectionEstablished();
+ out->connectionEstablished();
}
void Connection::restartTimeout()
@@ -474,20 +542,4 @@ void Connection::restartTimeout()
bool Connection::isOpen() { return adapter.isOpen(); }
-Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {}
-void Connection::OutboundFrameTracker::close() { next->close(); }
-void Connection::OutboundFrameTracker::abort() { next->abort(); }
-void Connection::OutboundFrameTracker::connectionEstablished() { next->connectionEstablished(); }
-void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
-void Connection::OutboundFrameTracker::handle(framing::AMQFrame& f)
-{
- next->handle(f);
- con.sent(f);
-}
-void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)
-{
- next = p.get();
- p.set(this);
-}
-
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index a5446a72d8..d2bc22cbe9 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -28,23 +28,29 @@
#include <queue>
#include "qpid/broker/BrokerImportExport.h"
+
#include "qpid/broker/ConnectionHandler.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/ConnectionToken.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/RefCounted.h"
+#include "qpid/Url.h"
#include "qpid/ptr_map.h"
#include "qmf/org/apache/qpid/broker/Connection.h"
#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/scoped_ptr.hpp>
#include <boost/bind.hpp>
#include <algorithm>
namespace qpid {
namespace sys {
+class ConnectionOutputHandler;
class Timer;
class TimerTask;
}
@@ -58,10 +64,45 @@ class SessionHandler;
struct ConnectionTimeoutTask;
class Connection : public sys::ConnectionInputHandler,
- public ConnectionState,
+ public ConnectionToken, public management::Manageable,
public RefCounted
{
public:
+ uint32_t getFrameMax() const { return framemax; }
+ uint16_t getHeartbeat() const { return heartbeat; }
+ uint16_t getHeartbeatMax() const { return heartbeatmax; }
+
+ void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
+ void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+ void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
+
+ const std::string& getUserId() const { return userId; }
+
+ void setUrl(const std::string& _url) { url = _url; }
+ const std::string& getUrl() const { return url; }
+
+ void setUserProxyAuth(const bool b);
+ bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
+ bool isFederationLink() const { return federationPeerTag.size() > 0; }
+ void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
+ const std::string& getFederationPeerTag() const { return federationPeerTag; }
+ std::vector<Url>& getKnownHosts() { return knownHosts; }
+
+ /**@return true if user is the authenticated user on this connection.
+ * If id has the default realm will also compare plain username.
+ */
+ bool isAuthenticatedUser(const std::string& id) const {
+ return (id == userId || (isDefaultRealm && id == userName));
+ }
+
+ Broker& getBroker() { return broker; }
+
+ sys::ConnectionOutputHandler& getOutput() { return *out; }
+ void activateOutput();
+ void addOutputTask(OutputTask*);
+ void removeOutputTask(OutputTask*);
+ framing::ProtocolVersion getVersion() const { return version; }
+
Connection(sys::ConnectionOutputHandler* out,
Broker& broker,
const std::string& mgmtId,
@@ -111,10 +152,8 @@ class Connection : public sys::ConnectionInputHandler,
void setUserId(const std::string& uid);
// credentials for connected client
- const std::string& getUserId() const { return ConnectionState::getUserId(); }
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
- void setUserProxyAuth(bool b);
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
@@ -137,18 +176,39 @@ class Connection : public sys::ConnectionInputHandler,
const framing::FieldTable& getClientProperties() const { return clientProperties; }
private:
+ // Management object is used in the constructor so must be early
+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
+
+ //contained output tasks
+ sys::AggregateOutput outputTasks;
+
+ boost::scoped_ptr<framing::FrameHandler> outboundTracker;
+ boost::scoped_ptr<sys::ConnectionOutputHandler> out;
+
+ Broker& broker;
+
+ framing::ProtocolVersion version;
+ uint32_t framemax;
+ uint16_t heartbeat;
+ uint16_t heartbeatmax;
+ std::string userId;
+ std::string url;
+ bool userProxyAuth;
+ std::string federationPeerTag;
+ std::vector<Url> knownHosts;
+ std::string userName;
+ bool isDefaultRealm;
+
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
- typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
ChannelMap channels;
qpid::sys::SecuritySettings securitySettings;
- ConnectionHandler adapter;
const bool link;
+ ConnectionHandler adapter;
bool mgmtClosing;
const std::string mgmtId;
sys::Mutex ioCallbackLock;
std::queue<boost::function0<void> > ioCallbacks;
- qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
LinkRegistry& links;
management::ManagementAgent* agent;
sys::Timer& timer;
@@ -157,25 +217,7 @@ class Connection : public sys::ConnectionInputHandler,
uint64_t objectId;
framing::FieldTable clientProperties;
- /**
- * Chained ConnectionOutputHandler that allows outgoing frames to be
- * tracked (for updating mgmt stats).
- */
- class OutboundFrameTracker : public sys::ConnectionOutputHandler
- {
- public:
- OutboundFrameTracker(Connection&);
- void close();
- void abort();
- void connectionEstablished();
- void activateOutput();
- void handle(framing::AMQFrame&);
- void wrap(sys::ConnectionOutputHandlerPtr&);
- private:
- Connection& con;
- sys::ConnectionOutputHandler* next;
- };
- OutboundFrameTracker outboundTracker;
+friend class OutboundFrameTracker;
void sent(const framing::AMQFrame& f);
void doIoCallbacks();
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index e86dcdb086..40393f1920 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -33,6 +33,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/SecurityLayer.h"
#include "qpid/sys/Time.h"
#include "qpid/broker/AclModule.h"
@@ -254,7 +255,7 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
const framing::Array& /*capabilities*/, bool /*insist*/)
{
- std::vector<Url> urls = connection.broker.getKnownBrokers();
+ std::vector<Url> urls = connection.getBroker().getKnownBrokers();
framing::Array array(0x95); // str16 array
for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.cpp b/qpid/cpp/src/qpid/broker/ConnectionState.cpp
deleted file mode 100644
index c6a8317c2b..0000000000
--- a/qpid/cpp/src/qpid/broker/ConnectionState.cpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/ConnectionState.h"
-
-#include "qpid/broker/Broker.h"
-
-namespace qpid {
-namespace broker {
-
-void ConnectionState::setUserId(const std::string& uid) {
- userId = uid;
- size_t at = userId.find('@');
- userName = userId.substr(0, at);
- isDefaultRealm = (
- at!= std::string::npos &&
- getBroker().getOptions().realm == userId.substr(at+1,userId.size()));
-}
-
-}}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
deleted file mode 100644
index 88ec3af62e..0000000000
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ConnectionState_
-#define _ConnectionState_
-
-#include "qpid/broker/ConnectionToken.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandlerPtr.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/Url.h"
-
-#include <boost/function.hpp>
-#include <vector>
-
-
-namespace qpid {
-namespace broker {
-
-class Broker;
-
-class ConnectionState : public ConnectionToken, public management::Manageable
-{
- protected:
- sys::ConnectionOutputHandlerPtr out;
-
- public:
- ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
- out(o),
- broker(b),
- framemax(65535),
- heartbeat(0),
- heartbeatmax(120),
- userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links)
- isDefaultRealm(false)
- {}
-
- virtual ~ConnectionState () {}
-
- uint32_t getFrameMax() const { return framemax; }
- uint16_t getHeartbeat() const { return heartbeat; }
- uint16_t getHeartbeatMax() const { return heartbeatmax; }
-
- void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
- void setHeartbeat(uint16_t hb) { heartbeat = hb; }
- void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
-
- virtual void setUserId(const std::string& uid);
-
- const std::string& getUserId() const { return userId; }
-
- void setUrl(const std::string& _url) { url = _url; }
- const std::string& getUrl() const { return url; }
-
- void setUserProxyAuth(const bool b) { userProxyAuth = b; }
- bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
- bool isFederationLink() const { return federationPeerTag.size() > 0; }
- void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
- const std::string& getFederationPeerTag() const { return federationPeerTag; }
- std::vector<Url>& getKnownHosts() { return knownHosts; }
-
- /**@return true if user is the authenticated user on this connection.
- * If id has the default realm will also compare plain username.
- */
- bool isAuthenticatedUser(const std::string& id) const {
- return (id == userId || (isDefaultRealm && id == userName));
- }
-
- Broker& getBroker() { return broker; }
-
- Broker& broker;
-
- //contained output tasks
- sys::AggregateOutput outputTasks;
-
- sys::ConnectionOutputHandler& getOutput() { return out; }
- framing::ProtocolVersion getVersion() const { return version; }
-
- virtual void requestIOProcessing (boost::function0<void>) = 0;
-
- protected:
- framing::ProtocolVersion version;
- uint32_t framemax;
- uint16_t heartbeat;
- uint16_t heartbeatmax;
- std::string userId;
- std::string url;
- bool userProxyAuth;
- std::string federationPeerTag;
- std::vector<Url> knownHosts;
- std::string userName;
- bool isDefaultRealm;
-};
-
-}}
-
-#endif
diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h
index aae636e818..72bfb1c474 100644
--- a/qpid/cpp/src/qpid/broker/HandlerImpl.h
+++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h
@@ -21,7 +21,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/SessionContext.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/Connection.h"
namespace qpid {
namespace broker {
@@ -40,7 +40,7 @@ class HandlerImpl {
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- ConnectionState& getConnection() { return session.getConnection(); }
+ Connection& getConnection() { return session.getConnection(); }
Broker& getBroker() { return session.getConnection().getBroker(); }
};
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 1d8b117b84..a5ef8c560c 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -22,9 +22,10 @@
#include "qpid/broker/AclModule.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
-#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/SecuritySettings.h"
#include <boost/format.hpp>
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index a42ed883a9..54069df591 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -42,6 +42,7 @@
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/FedOps.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -439,7 +440,7 @@ void SemanticState::disable(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
if (session.isAttached())
- session.getConnection().outputTasks.removeOutputTask(c.get());
+ session.getConnection().removeOutputTask(c.get());
}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
@@ -505,8 +506,8 @@ void SemanticState::requestDispatch()
void SemanticStateConsumerImpl::requestDispatch()
{
if (blocked) {
- parent->session.getConnection().outputTasks.addOutputTask(this);
- parent->session.getConnection().getOutput().activateOutput();
+ parent->session.getConnection().addOutputTask(this);
+ parent->session.getConnection().activateOutput();
blocked = false;
}
}
@@ -735,8 +736,8 @@ void SemanticStateConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
if (notifyEnabled) {
- parent->session.getConnection().outputTasks.addOutputTask(this);
- parent->session.getConnection().getOutput().activateOutput();
+ parent->session.getConnection().addOutputTask(this);
+ parent->session.getConnection().activateOutput();
}
}
@@ -804,16 +805,16 @@ void SemanticState::attached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
- session.getConnection().outputTasks.addOutputTask(i->second.get());
+ session.getConnection().addOutputTask(i->second.get());
}
- session.getConnection().getOutput().activateOutput();
+ session.getConnection().activateOutput();
}
void SemanticState::detached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
- session.getConnection().outputTasks.removeOutputTask(i->second.get());
+ session.getConnection().removeOutputTask(i->second.get());
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 8134eba041..316301481f 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -36,7 +36,7 @@ class AMQP_ClientProxy;
namespace broker {
class Broker;
-class ConnectionState;
+class Connection;
class SessionContext : public OwnershipToken
{
@@ -44,7 +44,7 @@ class SessionContext : public OwnershipToken
virtual ~SessionContext(){}
virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual bool isAttached() const = 0;
- virtual ConnectionState& getConnection() = 0;
+ virtual Connection& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 28827ccb9e..8cbecbc6f7 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/SessionState.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
#include <boost/bind.hpp>
@@ -63,9 +64,9 @@ void SessionHandler::executionException(
errorListener->executionException(code, msg);
}
-ConnectionState& SessionHandler::getConnection() { return connection; }
+Connection& SessionHandler::getConnection() { return connection; }
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
+const Connection& SessionHandler::getConnection() const { return connection; }
void SessionHandler::handleDetach() {
qpid::amqp_0_10::SessionHandler::handleDetach();
@@ -80,7 +81,7 @@ void SessionHandler::handleDetach() {
void SessionHandler::setState(const std::string& name, bool force) {
assert(!session.get());
SessionId id(connection.getUserId(), name);
- session = connection.broker.getSessionManager().attach(*this, id, force);
+ session = connection.getBroker().getSessionManager().attach(*this, id, force);
}
void SessionHandler::detaching()
@@ -102,7 +103,7 @@ void SessionHandler::readyToSend() {
void SessionHandler::attachAs(const std::string& name)
{
SessionId id(connection.getUserId(), name);
- SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+ SessionState::Configuration config = connection.getBroker().getSessionManager().getSessionConfig();
session.reset(new SessionState(connection.getBroker(), *this, id, config));
sendAttach(false);
}
@@ -118,7 +119,7 @@ void SessionHandler::attached(const std::string& name)
qpid::amqp_0_10::SessionHandler::attached(name);
} else {
SessionId id(connection.getUserId(), name);
- SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+ SessionState::Configuration config = connection.getBroker().getSessionManager().getSessionConfig();
session.reset(new SessionState(connection.getBroker(), *this, id, config));
markReadyToSend();
}
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index d42b7838bb..da57fb103e 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -33,7 +33,6 @@ class SessionState;
namespace broker {
class Connection;
-class ConnectionState;
class SessionState;
/**
@@ -65,8 +64,8 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- ConnectionState& getConnection();
- const ConnectionState& getConnection() const;
+ Connection& getConnection();
+ const Connection& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index a89e5ef974..ccf77413df 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
@@ -97,7 +96,7 @@ uint16_t SessionState::getChannel() const {
return handler->getChannel();
}
-ConnectionState& SessionState::getConnection() {
+Connection& SessionState::getConnection() {
assert(isAttached());
return handler->getConnection();
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 269bba9f6e..df6ba3b17f 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -89,7 +89,7 @@ class SessionState : public qpid::SessionState,
uint16_t getChannel() const;
/** @pre isAttached() */
- ConnectionState& getConnection();
+ Connection& getConnection();
bool isLocal(const ConnectionToken* t) const;
Broker& getBroker();
diff --git a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
index c04d037a6e..9379d20b06 100644
--- a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
@@ -27,6 +27,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
#include <windows.h>
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 7b153f90ca..f36da6c1e1 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -31,7 +31,6 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/SessionContext.h"
-#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 86e9d0be8d..9adc59b63d 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -37,7 +37,7 @@
#include "qpid/sys/Timer.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/PollableQueue.h"
-#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/AclModule.h"
#include "qpid/types/Variant.h"
#include "qpid/types/Uuid.h"
@@ -88,7 +88,7 @@ const string keyifyNameStr(const string& name)
struct ScopedManagementContext
{
- ScopedManagementContext(const qpid::broker::ConnectionState* context)
+ ScopedManagementContext(const qpid::broker::Connection* context)
{
setManagementExecutionContext(context);
}
@@ -1286,7 +1286,7 @@ void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyT
return;
}
- string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+ string userId = ((const qpid::broker::Connection*) connToken)->getUserId();
if (acl != 0) {
map<acl::Property, string> params;
params[acl::PROP_SCHEMAPACKAGE] = packageName;
@@ -1407,7 +1407,7 @@ void ManagementAgent::handleMethodRequest (const string& body, const string& rte
return;
}
- string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+ string userId = ((const qpid::broker::Connection*) connToken)->getUserId();
if (acl != 0) {
map<acl::Property, string> params;
params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName();
@@ -1723,7 +1723,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
uint32_t assignedBank;
- ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+ ObjectId connectionRef = ((const Connection*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
moveNewObjects();
@@ -2206,7 +2206,7 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg)
if (acl == 0)
return true;
- string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId();
+ string userId = ((const qpid::broker::Connection*) msg.getPublisher())->getUserId();
params[acl::PROP_SCHEMAPACKAGE] = packageName;
params[acl::PROP_SCHEMACLASS] = className;
@@ -2276,7 +2276,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
+ ScopedManagementContext context((const qpid::broker::Connection*) msg.getPublisher());
const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
if (headers && p->getAppId() == "qmf2")
{
@@ -2755,14 +2755,14 @@ ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents(
}
namespace {
-QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
+QPID_TSS const qpid::broker::Connection* executionContext = 0;
}
-void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
+void setManagementExecutionContext(const qpid::broker::Connection* ctxt)
{
executionContext = ctxt;
}
-const qpid::broker::ConnectionState* getManagementExecutionContext()
+const qpid::broker::Connection* getManagementExecutionContext()
{
return executionContext;
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 6de5d1d719..2de2a232de 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -44,7 +44,7 @@
namespace qpid {
namespace broker {
-class ConnectionState;
+class Connection;
}
namespace sys {
class Timer;
@@ -379,8 +379,8 @@ private:
std::auto_ptr<EventQueue> sendQueue;
};
-void setManagementExecutionContext(const qpid::broker::ConnectionState*);
-const qpid::broker::ConnectionState* getManagementExecutionContext();
+void setManagementExecutionContext(const qpid::broker::Connection*);
+const qpid::broker::Connection* getManagementExecutionContext();
}}
#endif /*!_ManagementAgent_*/
diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
deleted file mode 100644
index 053f47df49..0000000000
--- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#ifndef QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
-#define QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ConnectionOutputHandler.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * A ConnectionOutputHandler that delegates to another
- * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler
- * to be changed without updating all the pointers/references
- * using the ConnectionOutputHandlerPtr
- */
-class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
-{
- public:
- ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) { assert(next); }
- void set(ConnectionOutputHandler* p) { next = p; assert(next); }
- ConnectionOutputHandler* get() { return next; }
- const ConnectionOutputHandler* get() const { return next; }
-
- void close() { next->close(); }
- void abort() { next->abort(); }
- void connectionEstablished() { next->connectionEstablished(); }
- void activateOutput() { next->activateOutput(); }
- void handle(framing::AMQFrame& f) { next->handle(f); }
-
- private:
- ConnectionOutputHandler* next;
-};
-}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H*/