diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/Broker.h | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Broker.h')
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 99 |
1 files changed, 34 insertions, 65 deletions
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 698d446bca..468da8983a 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -25,40 +25,30 @@ #include "qpid/broker/AsyncResultQueueImpl.h" #include "qpid/broker/AsyncStore.h" #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/ConnectionFactory.h" -#include "qpid/broker/ConnectionToken.h" -#include "qpid/broker/DirectExchange.h" + +#include "qpid/DataDir.h" +#include "qpid/Options.h" +#include "qpid/Plugin.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" //#include "qpid/broker/MessageStore.h" +#include "qpid/broker/Protocol.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/SessionManager.h" #include "qpid/broker/QueueCleaner.h" #include "qpid/broker/Vhost.h" #include "qpid/broker/System.h" -#include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" #include "qpid/broker/ConfigurationObservers.h" #include "qpid/management/Manageable.h" -#include "qpid/management/ManagementAgent.h" -#include "qmf/org/apache/qpid/broker/Broker.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h" -#include "qpid/Options.h" -#include "qpid/Plugin.h" -#include "qpid/DataDir.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Timer.h" -#include "qpid/types/Variant.h" -#include "qpid/RefCounted.h" -#include "qpid/broker/AclModule.h" +#include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" #include <boost/intrusive_ptr.hpp> + #include <string> #include <vector> @@ -67,12 +57,14 @@ namespace qpid { namespace sys { class ProtocolFactory; class Poller; +class Timer; } struct Url; namespace broker { +class AclModule; class ConnectionState; class ExpiryPolicy; class Message; @@ -103,6 +95,7 @@ class Broker : public sys::Runnable, public Plugin::Target, bool noDataDir; std::string dataDir; uint16_t port; + std::vector<std::string> listenInterfaces; int workerThreads; int connectionBacklog; bool enableMgmt; @@ -139,10 +132,15 @@ class Broker : public sys::Runnable, public Plugin::Target, void declareStandardExchange(const std::string& name, const std::string& type); void setStore (); - static void recoverComplete(const AsyncResultHandle* const); - static void configureComplete(const AsyncResultHandle* const); + static void recoverCompleteCb(const AsyncResultHandle* const); + static void configureCompleteCb(const AsyncResultHandle* const); + static Broker* thisBroker; + void recoverComplete(const AsyncResultHandle* const arh); + void configureComplete(const AsyncResultHandle* const arh); void setLogLevel(const std::string& level); std::string getLogLevel(); + void setLogHiresTimestamp(bool enabled); + bool getLogHiresTimestamp(); void createObject(const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context); void deleteObject(const std::string& type, const std::string& name, @@ -158,8 +156,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Manageable::status_t setTimestampConfig(const bool receive, const ConnectionState* context); boost::shared_ptr<sys::Poller> poller; - sys::Timer timer; - std::auto_ptr<sys::Timer> clusterTimer; + std::auto_ptr<sys::Timer> timer; Options config; std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; @@ -178,7 +175,7 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::shared_ptr<sys::ConnectionCodec::Factory> factory; DtxManager dtxManager; SessionManager sessionManager; - qmf::org::apache::qpid::broker::Broker* mgmtObject; + qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; QueueCleaner queueCleaner; @@ -187,10 +184,10 @@ class Broker : public sys::Runnable, public Plugin::Target, bool deferDeliveryImpl(const std::string& queue, const Message& msg); std::string federationTag; - bool recovery; - bool inCluster, clusterUpdatee; + bool recoveryInProgress; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConsumerFactories consumerFactories; + ProtocolRegistry protocolRegistry; mutable sys::Mutex linkClientPropertiesLock; framing::FieldTable linkClientProperties; @@ -233,6 +230,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir& getDataDir() { return dataDir; } Options& getOptions() { return config; } AsyncResultQueueImpl& getAsyncResultQueue() { return asyncResultQueue; } + ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; } void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } @@ -240,7 +238,7 @@ class Broker : public sys::Runnable, public Plugin::Target, SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const; QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const; QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod( uint32_t methodId, management::Args& args, std::string& text); @@ -253,19 +251,17 @@ class Broker : public sys::Runnable, public Plugin::Target, QPID_BROKER_EXTERN void accept(); /** Create a connection to another broker. */ - void connect(const std::string& host, const std::string& port, + void connect(const std::string& name, + const std::string& host, const std::string& port, const std::string& transport, - boost::function2<void, int, std::string> failed, - sys::ConnectionCodec::Factory* =0); - /** Create a connection to another broker. */ - void connect(const Url& url, - boost::function2<void, int, std::string> failed, - sys::ConnectionCodec::Factory* =0); + boost::function2<void, int, std::string> failed); /** Move messages from one queue to another. A zero quantity means to move all messages + Return -1 if one of the queues does not exist, otherwise + the number of messages moved. */ - QPID_BROKER_EXTERN uint32_t queueMoveMessages( + QPID_BROKER_EXTERN int32_t queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, uint32_t qty, @@ -277,46 +273,17 @@ class Broker : public sys::Runnable, public Plugin::Target, /** Expose poller so plugins can register their descriptors. */ QPID_BROKER_EXTERN boost::shared_ptr<sys::Poller> getPoller(); - boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; } - void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } - /** Timer for local tasks affecting only this broker */ - sys::Timer& getTimer() { return timer; } - - /** Timer for tasks that must be synchronized if we are in a cluster */ - sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; } - QPID_BROKER_EXTERN void setClusterTimer(std::auto_ptr<sys::Timer>); + sys::Timer& getTimer() { return *timer; } boost::function<std::vector<Url> ()> getKnownBrokers; static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT; - void setRecovery(bool set) { recovery = set; } - bool getRecovery() const { return recovery; } - - /** True of this broker is part of a cluster. - * Only valid after early initialization of plugins is complete. - */ - bool isInCluster() const { return inCluster; } - void setInCluster(bool set) { inCluster = set; } - - /** True if this broker is joining a cluster and in the process of - * receiving a state update. - */ - bool isClusterUpdatee() const { return clusterUpdatee; } - void setClusterUpdatee(bool set) { clusterUpdatee = set; } + bool inRecovery() const { return recoveryInProgress; } management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } - /** - * Never true in a stand-alone broker. In a cluster, return true - * to defer delivery of messages deliveredg in a cluster-unsafe - * context. - *@return true if delivery of a message should be deferred. - */ - boost::function<bool (const std::string& queue, - const Message& msg)> deferDelivery; - bool isAuthenticating ( ) { return config.auth; } bool isTimestamping() { return config.timestampRcvMsgs; } @@ -371,8 +338,10 @@ class Broker : public sys::Runnable, public Plugin::Target, QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const; QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&); + QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; } /** Information identifying this system */ boost::shared_ptr<const System> getSystem() const { return systemObject; } + friend class StatusCheckThread; }; }} |