diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Vhost.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Vhost.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 1 |
8 files changed, 60 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 65ed38b731..5064320efb 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -91,11 +91,21 @@ void Bridge::create(ConnectionState& c) string queue = "bridge_queue_"; queue += Uuid(true).str(); FieldTable queueSettings; + if (args.i_tag.size()) { queueSettings.setString("qpid.trace.id", args.i_tag); + } else { + const string& localTag = link->getBroker()->getFederationTag(); + if (localTag.size()) + queueSettings.setString("qpid.trace.id", localTag); } + if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } else { + const string& peerTag = c.getFederationPeerTag(); + if (peerTag.size()) + queueSettings.setString("qpid.trace.exclude", peerTag); } bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d401436d38..910c774958 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/Uuid.h" #include "qpid/sys/ProtocolFactory.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Dispatcher.h" @@ -136,7 +137,7 @@ Broker::Broker(const Broker::Options& conf) : managementAgentSingleton(!config.enableMgmt), store(0), acl(0), - dataDir(conf.noDataDir ? std::string () : conf.dataDir), + dataDir(conf.noDataDir ? std::string() : conf.dataDir), links(this), factory(new ConnectionFactory(*this)), dtxManager(timer), @@ -148,40 +149,43 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, timer), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { - if(conf.enableMgmt){ + if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); managementAgent = managementAgentSingleton.getInstance(); ((ManagementBroker*) managementAgent)->configure - (dataDir.isEnabled () ? dataDir.getPath () : string (), + (dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPubInterval, this, conf.workerThreads + 3); - _qmf::Package packageInitializer (managementAgent); - - System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); - systemObject = System::shared_ptr (system); - - mgmtObject = new _qmf::Broker (managementAgent, this, system, conf.port); - mgmtObject->set_workerThreads (conf.workerThreads); - mgmtObject->set_maxConns (conf.maxConnections); - mgmtObject->set_connBacklog (conf.connectionBacklog); - mgmtObject->set_stagingThreshold (conf.stagingThreshold); - mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); - mgmtObject->set_version (qpid::version); + _qmf::Package packageInitializer(managementAgent); + + System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string()); + systemObject = System::shared_ptr(system); + + mgmtObject = new _qmf::Broker(managementAgent, this, system, conf.port); + mgmtObject->set_workerThreads(conf.workerThreads); + mgmtObject->set_maxConns(conf.maxConnections); + mgmtObject->set_connBacklog(conf.connectionBacklog); + mgmtObject->set_stagingThreshold(conf.stagingThreshold); + mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); + mgmtObject->set_version(qpid::version); if (dataDir.isEnabled()) mgmtObject->set_dataDir(dataDir.getPath()); else mgmtObject->clr_dataDir(); - managementAgent->addObject (mgmtObject, 0x1000000000000002LL); + managementAgent->addObject(mgmtObject, 0x1000000000000002LL); // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the // management schema correct. - Vhost* vhost = new Vhost (this); - vhostObject = Vhost::shared_ptr (vhost); - - queues.setParent (vhost); - exchanges.setParent (vhost); - links.setParent (vhost); + Vhost* vhost = new Vhost(this); + vhostObject = Vhost::shared_ptr(vhost); + framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid()); + federationTag = uuid.str(); + vhostObject->setFederationTag(federationTag); + + queues.setParent(vhost); + exchanges.setParent(vhost); + links.setParent(vhost); } QueuePolicy::setDefaultMaxSize(conf.queueLimit); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a7496f1510..cdb4c4a034 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -128,6 +128,7 @@ class Broker : public sys::Runnable, public Plugin::Target, std::vector<Url> knownBrokers; std::vector<Url> getKnownBrokersImpl(); + std::string federationTag; public: @@ -168,6 +169,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } SessionManager& getSessionManager() { return sessionManager; } + const std::string& getFederationTag() const { return federationTag; } management::ManagementObject* GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index c47037cf9c..dd1ac20dd2 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -42,6 +42,7 @@ const std::string ANONYMOUS = "ANONYMOUS"; const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; const std::string QPID_FED_LINK = "qpid.fed_link"; +const std::string QPID_FED_TAG = "qpid.federation_tag"; } void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId) @@ -83,6 +84,8 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : FieldTable properties; Array mechanisms(0x95); + properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); + authenticator = SaslAuthenticator::createAuthenticator(c); authenticator->getMechanisms(mechanisms); @@ -104,12 +107,13 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper { authenticator->start(mechanism, response); connection.setFederationLink(clientProperties.get(QPID_FED_LINK)); - if (connection.isFederationLink()){ + connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG)); + if (connection.isFederationLink()) { if (acl && !acl->authorise(connection.getUserId(),acl::CREATE,acl::LINK,"")){ - client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); - return; - } - QPID_LOG(info, "Connection is a federation link"); + client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); + return; + } + QPID_LOG(info, "Connection is a federation link"); } } @@ -154,15 +158,18 @@ void ConnectionHandler::Handler::closeOk(){ } -void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/, +void ConnectionHandler::Handler::start(const FieldTable& serverProperties, const framing::Array& /*mechanisms*/, const framing::Array& /*locales*/) { string mechanism = connection.getAuthMechanism(); string response = connection.getAuthCredentials(); + connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG)); + FieldTable ft; ft.setInt(QPID_FED_LINK,1); + ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); server.startOk(ft, mechanism, response, en_US); } diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index c04bd46f72..fd69157dbd 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -68,6 +68,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setFederationLink(bool b) { federationLink = b; } bool isFederationLink() const { return federationLink; } + void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); } + const string& getFederationPeerTag() const { return federationPeerTag; } Broker& getBroker() { return broker; } @@ -90,6 +92,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable string userId; string url; bool federationLink; + string federationPeerTag; }; }} diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index 4d9d3bb604..c030d4c51f 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -38,3 +38,7 @@ Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0) } } +void Vhost::setFederationTag(const std::string& tag) +{ + mgmtObject->set_federationTag(tag); +} diff --git a/cpp/src/qpid/broker/Vhost.h b/cpp/src/qpid/broker/Vhost.h index 59c0d4f959..ef59362e4d 100644 --- a/cpp/src/qpid/broker/Vhost.h +++ b/cpp/src/qpid/broker/Vhost.h @@ -41,6 +41,7 @@ class Vhost : public management::Manageable management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } + void setFederationTag(const std::string& tag); }; }} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 86aa31b8ed..3564d462df 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -67,6 +67,7 @@ public: bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + const framing::Uuid& getUuid() const { return uuid; } // Stubs for remote management agent calls void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); } |
