From 633c33f224f3196f3f9bd80bd2e418d8143fea06 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 4 May 2012 15:39:19 +0000 Subject: QPID-3858: Updated branch - merged from trunk r.1333987 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/AclModule.h | 191 ++++++- cpp/src/qpid/broker/Bridge.cpp | 15 +- cpp/src/qpid/broker/Bridge.h | 13 +- cpp/src/qpid/broker/Broker.cpp | 7 +- cpp/src/qpid/broker/Broker.h | 87 +-- cpp/src/qpid/broker/Connection.cpp | 16 +- cpp/src/qpid/broker/Connection.h | 5 + cpp/src/qpid/broker/ConnectionHandler.cpp | 1 + cpp/src/qpid/broker/ConnectionHandler.h | 1 - cpp/src/qpid/broker/DirectExchange.cpp | 3 +- cpp/src/qpid/broker/DirectExchange.h | 4 +- cpp/src/qpid/broker/DtxManager.cpp | 29 +- cpp/src/qpid/broker/DtxManager.h | 3 + cpp/src/qpid/broker/DtxWorkRecord.cpp | 11 +- cpp/src/qpid/broker/Exchange.cpp | 28 +- cpp/src/qpid/broker/Exchange.h | 2 +- cpp/src/qpid/broker/ExchangeRegistry.cpp | 3 + cpp/src/qpid/broker/ExchangeRegistry.h | 6 +- cpp/src/qpid/broker/Fairshare.cpp | 1 + cpp/src/qpid/broker/FanOutExchange.cpp | 2 +- cpp/src/qpid/broker/FanOutExchange.h | 4 +- cpp/src/qpid/broker/HeadersExchange.cpp | 3 +- cpp/src/qpid/broker/HeadersExchange.h | 4 +- cpp/src/qpid/broker/LegacyLVQ.cpp | 29 +- cpp/src/qpid/broker/LegacyLVQ.h | 1 + cpp/src/qpid/broker/Link.cpp | 263 +++++++-- cpp/src/qpid/broker/Link.h | 58 +- cpp/src/qpid/broker/LinkRegistry.cpp | 31 +- cpp/src/qpid/broker/LinkRegistry.h | 110 ++-- cpp/src/qpid/broker/Message.cpp | 8 +- cpp/src/qpid/broker/MessageDeque.cpp | 20 +- cpp/src/qpid/broker/MessageDeque.h | 7 +- cpp/src/qpid/broker/MessageGroupManager.cpp | 92 ++- cpp/src/qpid/broker/MessageGroupManager.h | 19 +- cpp/src/qpid/broker/MessageMap.cpp | 78 ++- cpp/src/qpid/broker/MessageMap.h | 2 +- cpp/src/qpid/broker/PriorityQueue.cpp | 118 ++-- cpp/src/qpid/broker/PriorityQueue.h | 27 +- cpp/src/qpid/broker/Queue.cpp | 667 ++++++++++++++-------- cpp/src/qpid/broker/Queue.h | 147 +++-- cpp/src/qpid/broker/QueueListeners.cpp | 4 - cpp/src/qpid/broker/QueueListeners.h | 7 +- cpp/src/qpid/broker/QueuedMessage.cpp | 34 ++ cpp/src/qpid/broker/QueuedMessage.h | 3 + cpp/src/qpid/broker/SaslAuthenticator.cpp | 1 + cpp/src/qpid/broker/SemanticState.cpp | 4 +- cpp/src/qpid/broker/SemanticState.h | 65 ++- cpp/src/qpid/broker/SessionAdapter.cpp | 51 +- cpp/src/qpid/broker/SessionAdapter.h | 4 +- cpp/src/qpid/broker/SessionHandler.cpp | 5 + cpp/src/qpid/broker/SessionHandler.h | 10 +- cpp/src/qpid/broker/TopicExchange.cpp | 3 +- cpp/src/qpid/broker/TopicExchange.h | 4 +- cpp/src/qpid/broker/windows/SaslAuthenticator.cpp | 1 + 54 files changed, 1528 insertions(+), 784 deletions(-) create mode 100644 cpp/src/qpid/broker/QueuedMessage.cpp (limited to 'cpp/src/qpid/broker') diff --git a/cpp/src/qpid/broker/AclModule.h b/cpp/src/qpid/broker/AclModule.h index e32ff266b9..ff9281b6fc 100644 --- a/cpp/src/qpid/broker/AclModule.h +++ b/cpp/src/qpid/broker/AclModule.h @@ -22,6 +22,7 @@ #include "qpid/RefCounted.h" +#include "qpid/Exception.h" #include #include #include @@ -32,17 +33,81 @@ namespace qpid { namespace acl { - enum ObjectType {OBJ_QUEUE, OBJ_EXCHANGE, OBJ_BROKER, OBJ_LINK, - OBJ_METHOD, OBJECTSIZE}; // OBJECTSIZE must be last in list - enum Action {ACT_CONSUME, ACT_PUBLISH, ACT_CREATE, ACT_ACCESS, ACT_BIND, - ACT_UNBIND, ACT_DELETE, ACT_PURGE, ACT_UPDATE, - ACTIONSIZE}; // ACTIONSIZE must be last in list - enum Property {PROP_NAME, PROP_DURABLE, PROP_OWNER, PROP_ROUTINGKEY, - PROP_PASSIVE, PROP_AUTODELETE, PROP_EXCLUSIVE, PROP_TYPE, - PROP_ALTERNATE, PROP_QUEUENAME, PROP_SCHEMAPACKAGE, - PROP_SCHEMACLASS, PROP_POLICYTYPE, PROP_MAXQUEUESIZE, - PROP_MAXQUEUECOUNT}; - enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG}; + // Interface enumerations. + // These enumerations define enum lists and implied text strings + // to match. They are used in two areas: + // 1. In the ACL specifications in the ACL file, file parsing, and + // internal rule storage. + // 2. In the authorize interface in the rest of the broker where + // code requests the ACL module to authorize an action. + + // ObjectType shared between ACL spec and ACL authorise interface + enum ObjectType { + OBJ_QUEUE, + OBJ_EXCHANGE, + OBJ_BROKER, + OBJ_LINK, + OBJ_METHOD, + OBJECTSIZE }; // OBJECTSIZE must be last in list + + // Action shared between ACL spec and ACL authorise interface + enum Action { + ACT_CONSUME, + ACT_PUBLISH, + ACT_CREATE, + ACT_ACCESS, + ACT_BIND, + ACT_UNBIND, + ACT_DELETE, + ACT_PURGE, + ACT_UPDATE, + ACTIONSIZE }; // ACTIONSIZE must be last in list + + // Property used in ACL authorize interface + enum Property { + PROP_NAME, + PROP_DURABLE, + PROP_OWNER, + PROP_ROUTINGKEY, + PROP_AUTODELETE, + PROP_EXCLUSIVE, + PROP_TYPE, + PROP_ALTERNATE, + PROP_QUEUENAME, + PROP_SCHEMAPACKAGE, + PROP_SCHEMACLASS, + PROP_POLICYTYPE, + PROP_MAXQUEUESIZE, + PROP_MAXQUEUECOUNT }; + + // Property used in ACL spec file + // Note for properties common to file processing/rule storage and to + // broker rule lookups the identical enum values are used. + enum SpecProperty { + SPECPROP_NAME = PROP_NAME, + SPECPROP_DURABLE = PROP_DURABLE, + SPECPROP_OWNER = PROP_OWNER, + SPECPROP_ROUTINGKEY = PROP_ROUTINGKEY, + SPECPROP_AUTODELETE = PROP_AUTODELETE, + SPECPROP_EXCLUSIVE = PROP_EXCLUSIVE, + SPECPROP_TYPE = PROP_TYPE, + SPECPROP_ALTERNATE = PROP_ALTERNATE, + SPECPROP_QUEUENAME = PROP_QUEUENAME, + SPECPROP_SCHEMAPACKAGE = PROP_SCHEMAPACKAGE, + SPECPROP_SCHEMACLASS = PROP_SCHEMACLASS, + SPECPROP_POLICYTYPE = PROP_POLICYTYPE, + + SPECPROP_MAXQUEUESIZELOWERLIMIT, + SPECPROP_MAXQUEUESIZEUPPERLIMIT, + SPECPROP_MAXQUEUECOUNTLOWERLIMIT, + SPECPROP_MAXQUEUECOUNTUPPERLIMIT }; + +// AclResult shared between ACL spec and ACL authorise interface + enum AclResult { + ALLOW, + ALLOWLOG, + DENY, + DENYLOG }; } // namespace acl @@ -54,14 +119,25 @@ namespace broker { public: - // effienty turn off ACL on message transfer. + // Some ACLs are invoked on every message transfer. + // doTransferAcl pervents time consuming ACL calls on a per-message basis. virtual bool doTransferAcl()=0; - virtual bool authorise(const std::string& id, const acl::Action& action, const acl::ObjectType& objType, const std::string& name, + virtual bool authorise( + const std::string& id, + const acl::Action& action, + const acl::ObjectType& objType, + const std::string& name, std::map* params=0)=0; - virtual bool authorise(const std::string& id, const acl::Action& action, const acl::ObjectType& objType, const std::string& ExchangeName, - const std::string& RoutingKey)=0; - // create specilied authorise methods for cases that need faster matching as needed. + + virtual bool authorise( + const std::string& id, + const acl::Action& action, + const acl::ObjectType& objType, + const std::string& ExchangeName, + const std::string& RoutingKey)=0; + + // Add specialized authorise() methods as required. virtual ~AclModule() {}; }; @@ -79,7 +155,7 @@ namespace acl { if (str.compare("broker") == 0) return OBJ_BROKER; if (str.compare("link") == 0) return OBJ_LINK; if (str.compare("method") == 0) return OBJ_METHOD; - throw str; + throw qpid::Exception(str); } static inline std::string getObjectTypeStr(const ObjectType o) { switch (o) { @@ -102,7 +178,7 @@ namespace acl { if (str.compare("delete") == 0) return ACT_DELETE; if (str.compare("purge") == 0) return ACT_PURGE; if (str.compare("update") == 0) return ACT_UPDATE; - throw str; + throw qpid::Exception(str); } static inline std::string getActionStr(const Action a) { switch (a) { @@ -124,7 +200,6 @@ namespace acl { if (str.compare("durable") == 0) return PROP_DURABLE; if (str.compare("owner") == 0) return PROP_OWNER; if (str.compare("routingkey") == 0) return PROP_ROUTINGKEY; - if (str.compare("passive") == 0) return PROP_PASSIVE; if (str.compare("autodelete") == 0) return PROP_AUTODELETE; if (str.compare("exclusive") == 0) return PROP_EXCLUSIVE; if (str.compare("type") == 0) return PROP_TYPE; @@ -134,8 +209,8 @@ namespace acl { if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS; if (str.compare("policytype") == 0) return PROP_POLICYTYPE; if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE; - if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT; - throw str; + if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT; + throw qpid::Exception(str); } static inline std::string getPropertyStr(const Property p) { switch (p) { @@ -143,7 +218,6 @@ namespace acl { case PROP_DURABLE: return "durable"; case PROP_OWNER: return "owner"; case PROP_ROUTINGKEY: return "routingkey"; - case PROP_PASSIVE: return "passive"; case PROP_AUTODELETE: return "autodelete"; case PROP_EXCLUSIVE: return "exclusive"; case PROP_TYPE: return "type"; @@ -153,17 +227,61 @@ namespace acl { case PROP_SCHEMACLASS: return "schemaclass"; case PROP_POLICYTYPE: return "policytype"; case PROP_MAXQUEUESIZE: return "maxqueuesize"; - case PROP_MAXQUEUECOUNT: return "maxqueuecount"; + case PROP_MAXQUEUECOUNT: return "maxqueuecount"; default: assert(false); // should never get here } return ""; } + static inline SpecProperty getSpecProperty(const std::string& str) { + if (str.compare("name") == 0) return SPECPROP_NAME; + if (str.compare("durable") == 0) return SPECPROP_DURABLE; + if (str.compare("owner") == 0) return SPECPROP_OWNER; + if (str.compare("routingkey") == 0) return SPECPROP_ROUTINGKEY; + if (str.compare("autodelete") == 0) return SPECPROP_AUTODELETE; + if (str.compare("exclusive") == 0) return SPECPROP_EXCLUSIVE; + if (str.compare("type") == 0) return SPECPROP_TYPE; + if (str.compare("alternate") == 0) return SPECPROP_ALTERNATE; + if (str.compare("queuename") == 0) return SPECPROP_QUEUENAME; + if (str.compare("schemapackage") == 0) return SPECPROP_SCHEMAPACKAGE; + if (str.compare("schemaclass") == 0) return SPECPROP_SCHEMACLASS; + if (str.compare("policytype") == 0) return SPECPROP_POLICYTYPE; + if (str.compare("queuemaxsizelowerlimit") == 0) return SPECPROP_MAXQUEUESIZELOWERLIMIT; + if (str.compare("queuemaxsizeupperlimit") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT; + if (str.compare("queuemaxcountlowerlimit") == 0) return SPECPROP_MAXQUEUECOUNTLOWERLIMIT; + if (str.compare("queuemaxcountupperlimit") == 0) return SPECPROP_MAXQUEUECOUNTUPPERLIMIT; + // Allow old names in ACL file as aliases for newly-named properties + if (str.compare("maxqueuesize") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT; + if (str.compare("maxqueuecount") == 0) return SPECPROP_MAXQUEUECOUNTUPPERLIMIT; + throw qpid::Exception(str); + } + static inline std::string getPropertyStr(const SpecProperty p) { + switch (p) { + case SPECPROP_NAME: return "name"; + case SPECPROP_DURABLE: return "durable"; + case SPECPROP_OWNER: return "owner"; + case SPECPROP_ROUTINGKEY: return "routingkey"; + case SPECPROP_AUTODELETE: return "autodelete"; + case SPECPROP_EXCLUSIVE: return "exclusive"; + case SPECPROP_TYPE: return "type"; + case SPECPROP_ALTERNATE: return "alternate"; + case SPECPROP_QUEUENAME: return "queuename"; + case SPECPROP_SCHEMAPACKAGE: return "schemapackage"; + case SPECPROP_SCHEMACLASS: return "schemaclass"; + case SPECPROP_POLICYTYPE: return "policytype"; + case SPECPROP_MAXQUEUESIZELOWERLIMIT: return "queuemaxsizelowerlimit"; + case SPECPROP_MAXQUEUESIZEUPPERLIMIT: return "queuemaxsizeupperlimit"; + case SPECPROP_MAXQUEUECOUNTLOWERLIMIT: return "queuemaxcountlowerlimit"; + case SPECPROP_MAXQUEUECOUNTUPPERLIMIT: return "queuemaxcountupperlimit"; + default: assert(false); // should never get here + } + return ""; + } static inline AclResult getAclResult(const std::string& str) { if (str.compare("allow") == 0) return ALLOW; if (str.compare("allow-log") == 0) return ALLOWLOG; if (str.compare("deny") == 0) return DENY; if (str.compare("deny-log") == 0) return DENYLOG; - throw str; + throw qpid::Exception(str); } static inline std::string getAclResultStr(const AclResult r) { switch (r) { @@ -187,8 +305,11 @@ namespace acl { typedef boost::shared_ptr objectMapPtr; typedef std::map propMap; typedef propMap::const_iterator propMapItr; + typedef std::map specPropMap; + typedef specPropMap::const_iterator specPropMapItr; - // This map contains the legal combinations of object/action/properties found in an ACL file + // This map contains the legal combinations of object/action/properties + // found in an ACL file static void loadValidationMap(objectMapPtr& map) { if (!map.get()) return; map->clear(); @@ -199,7 +320,6 @@ namespace acl { propSetPtr p1(new propSet); p1->insert(PROP_TYPE); p1->insert(PROP_ALTERNATE); - p1->insert(PROP_PASSIVE); p1->insert(PROP_DURABLE); propSetPtr p2(new propSet); @@ -224,7 +344,6 @@ namespace acl { propSetPtr p4(new propSet); p4->insert(PROP_ALTERNATE); - p4->insert(PROP_PASSIVE); p4->insert(PROP_DURABLE); p4->insert(PROP_EXCLUSIVE); p4->insert(PROP_AUTODELETE); @@ -260,21 +379,31 @@ namespace acl { map->insert(objectPair(OBJ_METHOD, a4)); } - static std::string propertyMapToString(const std::map* params) { + // + // properyMapToString + // + template + static std::string propertyMapToString( + const std::map* params) + { std::ostringstream ss; ss << "{"; if (params) { - for (propMapItr pMItr = params->begin(); pMItr != params->end(); pMItr++) { - ss << " " << getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second; + for (typename std::map::const_iterator + pMItr = params->begin(); pMItr != params->end(); pMItr++) + { + ss << " " << getPropertyStr((T) pMItr-> first) + << "=" << pMItr->second; } } ss << " }"; return ss.str(); } + }; - + }} // namespace qpid::acl #endif // QPID_ACLMODULE_ACL_H diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 9a1f4be468..5b531e4636 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -62,7 +62,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, InitializeCallback init) : link(_link), id(_id), args(_args), mgmtObject(0), listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), - initialize(init) + initialize(init), detached(false) { std::stringstream title; title << id << "_" << name; @@ -85,11 +85,14 @@ Bridge::~Bridge() void Bridge::create(Connection& c) { + detached = false; // Reset detached in case we are recovering. connState = &c; conn = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); SessionHandler& sessionHandler = c.getChannel(id); + sessionHandler.setDetachedCallback( + boost::bind(&Bridge::sessionDetached, shared_from_this())); if (args.i_srcIsLocal) { if (args.i_dynamic) throw Exception("Dynamic routing not supported for push routes"); @@ -179,12 +182,6 @@ void Bridge::destroy() listener(this); } -bool Bridge::isSessionReady() const -{ - SessionHandler& sessionHandler = conn->getChannel(id); - return sessionHandler.ready(); -} - void Bridge::setPersistenceId(uint64_t pId) const { persistenceId = pId; @@ -336,4 +333,8 @@ const string& Bridge::getLocalTag() const return link->getBroker()->getFederationTag(); } +void Bridge::sessionDetached() { + detached = true; +} + }} diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index b849b11ba8..32b9fd1781 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -33,6 +33,7 @@ #include "qmf/org/apache/qpid/broker/Bridge.h" #include +#include #include namespace qpid { @@ -44,7 +45,10 @@ class Link; class LinkRegistry; class SessionHandler; -class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge +class Bridge : public PersistableConfig, + public management::Manageable, + public Exchange::DynamicBridge, + public boost::enable_shared_from_this { public: typedef boost::shared_ptr shared_ptr; @@ -63,7 +67,7 @@ public: void destroy(); bool isDurable() { return args.i_durable; } - bool isSessionReady() const; + bool isDetached() const { return detached; } management::ManagementObject* GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, @@ -90,6 +94,9 @@ public: const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; } private: + // Callback when the bridge's session is detached. + void sessionDetached(); + struct PushHandler : framing::FrameHandler { PushHandler(Connection* c) { conn = c; } void handle(framing::AMQFrame& frame); @@ -112,7 +119,7 @@ private: ConnectionState* connState; Connection* conn; InitializeCallback initialize; - + bool detached; // Set when session is detached. bool resetProxy(); }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 0fd31580f6..f20cce18a2 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -111,6 +111,7 @@ Broker::Options::Options(const std::string& name) : maxConnections(500), connectionBacklog(10), enableMgmt(1), + mgmtPublish(1), mgmtPubInterval(10), queueCleanInterval(60*10),//10 minutes auth(SaslAuthenticator::available()), @@ -148,6 +149,7 @@ Broker::Options::Options(const std::string& name) : ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)") ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") // FIXME aconway 2012-02-13: consistent treatment of values in SECONDS @@ -213,7 +215,7 @@ Broker::Broker(const Broker::Options& conf) : try { if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); - managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), + managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish, conf.mgmtPubInterval, this, conf.workerThreads + 3); managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); @@ -228,6 +230,7 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_maxConns(conf.maxConnections); mgmtObject->set_connBacklog(conf.connectionBacklog); mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); + mgmtObject->set_mgmtPublish(conf.mgmtPublish); mgmtObject->set_version(qpid::version); if (dataDir.isEnabled()) mgmtObject->set_dataDir(dataDir.getPath()); @@ -885,7 +888,6 @@ std::pair, bool> Broker::createQueue( if (acl) { std::map params; params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); @@ -956,7 +958,6 @@ std::pair Broker::createExchange( std::map params; params.insert(make_pair(acl::PROP_TYPE, type)); params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId)); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index cff38eecdd..135b9340f9 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -106,6 +106,7 @@ public: int maxConnections; int connectionBacklog; bool enableMgmt; + bool mgmtPublish; uint16_t mgmtPubInterval; uint16_t queueCleanInterval; bool auth; @@ -206,7 +207,7 @@ public: ConsumerFactories consumerFactories; public: - virtual ~Broker(); + QPID_BROKER_EXTERN virtual ~Broker(); QPID_BROKER_EXTERN Broker(const Options& configuration); static QPID_BROKER_EXTERN boost::intrusive_ptr create(const Options& configuration); @@ -218,16 +219,16 @@ public: * port, which will be different if the configured port is * 0. */ - virtual uint16_t getPort(const std::string& name) const; + QPID_BROKER_EXTERN virtual uint16_t getPort(const std::string& name) const; /** * Run the broker. Implements Runnable::run() so the broker * can be run in a separate thread. */ - virtual void run(); + QPID_BROKER_EXTERN virtual void run(); /** Shut down the broker */ - virtual void shutdown(); + QPID_BROKER_EXTERN virtual void shutdown(); QPID_BROKER_EXTERN void setStore (boost::shared_ptr& store); void setAsyncStore(boost::shared_ptr& asyncStore); @@ -248,14 +249,14 @@ public: SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } - management::ManagementObject* GetManagementObject (void) const; - management::Manageable* GetVhostObject (void) const; - management::Manageable::status_t ManagementMethod (uint32_t methodId, - management::Args& args, - std::string& text); + QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const; + QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const; + QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod( + uint32_t methodId, management::Args& args, std::string& text); /** Add to the broker's protocolFactorys */ - void registerProtocolFactory(const std::string& name, boost::shared_ptr); + QPID_BROKER_EXTERN void registerProtocolFactory( + const std::string& name, boost::shared_ptr); /** Accept connections */ QPID_BROKER_EXTERN void accept(); @@ -273,15 +274,17 @@ public: /** Move messages from one queue to another. A zero quantity means to move all messages */ - uint32_t queueMoveMessages( const std::string& srcQueue, - const std::string& destQueue, - uint32_t qty, - const qpid::types::Variant::Map& filter); + QPID_BROKER_EXTERN uint32_t queueMoveMessages( + const std::string& srcQueue, + const std::string& destQueue, + uint32_t qty, + const qpid::types::Variant::Map& filter); - boost::shared_ptr getProtocolFactory(const std::string& name = TCP_TRANSPORT) const; + QPID_BROKER_EXTERN boost::shared_ptr getProtocolFactory( + const std::string& name = TCP_TRANSPORT) const; /** Expose poller so plugins can register their descriptors. */ - boost::shared_ptr getPoller(); + QPID_BROKER_EXTERN boost::shared_ptr getPoller(); boost::shared_ptr getConnectionFactory() { return factory; } void setConnectionFactory(boost::shared_ptr f) { factory = f; } @@ -291,7 +294,7 @@ public: /** Timer for tasks that must be synchronized if we are in a cluster */ sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; } - void setClusterTimer(std::auto_ptr); + QPID_BROKER_EXTERN void setClusterTimer(std::auto_ptr); boost::function ()> getKnownBrokers; @@ -322,15 +325,14 @@ public: * context. *@return true if delivery of a message should be deferred. */ - boost::function& msg)> deferDelivery; + boost::function& msg)> deferDelivery; bool isAuthenticating ( ) { return config.auth; } bool isTimestamping() { return config.timestampRcvMsgs; } typedef boost::function1 > QueueFunctor; - std::pair, bool> createQueue( + QPID_BROKER_EXTERN std::pair, bool> createQueue( const std::string& name, bool durable, bool autodelete, @@ -339,30 +341,39 @@ public: const qpid::framing::FieldTable& arguments, const std::string& userId, const std::string& connectionId); - void deleteQueue(const std::string& name, - const std::string& userId, - const std::string& connectionId, - QueueFunctor check = QueueFunctor()); - std::pair createExchange( + + QPID_BROKER_EXTERN void deleteQueue( + const std::string& name, + const std::string& userId, + const std::string& connectionId, + QueueFunctor check = QueueFunctor()); + + QPID_BROKER_EXTERN std::pair createExchange( const std::string& name, const std::string& type, bool durable, const std::string& alternateExchange, const qpid::framing::FieldTable& args, const std::string& userId, const std::string& connectionId); - void deleteExchange(const std::string& name, const std::string& userId, - const std::string& connectionId); - void bind(const std::string& queue, - const std::string& exchange, - const std::string& key, - const qpid::framing::FieldTable& arguments, - const std::string& userId, - const std::string& connectionId); - void unbind(const std::string& queue, - const std::string& exchange, - const std::string& key, - const std::string& userId, - const std::string& connectionId); + + QPID_BROKER_EXTERN void deleteExchange( + const std::string& name, const std::string& userId, + const std::string& connectionId); + + QPID_BROKER_EXTERN void bind( + const std::string& queue, + const std::string& exchange, + const std::string& key, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId); + + QPID_BROKER_EXTERN void unbind( + const std::string& queue, + const std::string& exchange, + const std::string& key, + const std::string& userId, + const std::string& connectionId); ConsumerFactories& getConsumerFactories() { return consumerFactories; } ConnectionObservers& getConnectionObservers() { return connectionObservers; } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 1e6aab217c..5e339cec03 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -185,11 +185,13 @@ void Connection::recordFromServer(const framing::AMQFrame& frame) // Don't record management stats in cluster-unsafe contexts if (mgmtObject != 0 && isClusterSafe()) { - mgmtObject->inc_framesToClient(); - mgmtObject->inc_bytesToClient(frame.encodedSize()); + qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); + cStats->framesToClient += 1; + cStats->bytesToClient += frame.encodedSize(); if (isMessage(frame.getMethod())) { - mgmtObject->inc_msgsToClient(); + cStats->msgsToClient += 1; } + mgmtObject->statisticsUpdated(); } } @@ -198,11 +200,13 @@ void Connection::recordFromClient(const framing::AMQFrame& frame) // Don't record management stats in cluster-unsafe contexts if (mgmtObject != 0 && isClusterSafe()) { - mgmtObject->inc_framesFromClient(); - mgmtObject->inc_bytesFromClient(frame.encodedSize()); + qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); + cStats->framesFromClient += 1; + cStats->bytesFromClient += frame.encodedSize(); if (isMessage(frame.getMethod())) { - mgmtObject->inc_msgsFromClient(); + cStats->msgsFromClient += 1; } + mgmtObject->statisticsUpdated(); } } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 858ab6f7f4..1b8bd83139 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -113,15 +113,20 @@ class Connection : public sys::ConnectionInputHandler, void requestIOProcessing (boost::function0); void recordFromServer (const framing::AMQFrame& frame); void recordFromClient (const framing::AMQFrame& frame); + + // gets for configured federation links std::string getAuthMechanism(); std::string getAuthCredentials(); std::string getUsername(); std::string getPassword(); std::string getHost(); uint16_t getPort(); + void notifyConnectionForced(const std::string& text); void setUserId(const std::string& uid); void raiseConnectEvent(); + + // credentials for connected client const std::string& getUserId() const { return ConnectionState::getUserId(); } const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index f1d43c5cdb..6894324117 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -28,6 +28,7 @@ #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ConnectionStartOkBody.h" #include "qpid/framing/enum.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/sys/SecurityLayer.h" #include "qpid/broker/AclModule.h" diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index 05c5f00c57..2e25543308 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -35,7 +35,6 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/Exception.h" -#include "qpid/broker/AclModule.h" #include "qpid/sys/SecurityLayer.h" diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 5591539853..5d9aea7509 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -153,8 +153,9 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c return true; } -void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +void DirectExchange::route(Deliverable& msg) { + const string& routingKey = msg.getMessage().getRoutingKey(); PreRoute pr(msg, this); ConstBindingList b; { diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index a6f9cf91af..833be52c1c 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -57,9 +57,7 @@ public: const std::string& routingKey, const qpid::framing::FieldTable* args); virtual bool unbind(boost::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index febd547478..d482c2c327 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/DtxManager.h" #include "qpid/broker/DtxTimeout.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/StructHelper.h" #include "qpid/log/Statement.h" #include "qpid/sys/Timer.h" #include "qpid/ptr_map.h" @@ -55,7 +56,7 @@ void DtxManager::recover(const std::string& xid, std::auto_ptrprepare(); } catch (DtxTimeoutException& e) { @@ -66,7 +67,7 @@ bool DtxManager::prepare(const std::string& xid) bool DtxManager::commit(const std::string& xid, bool onePhase) { - QPID_LOG(debug, "committing: " << xid); + QPID_LOG(debug, "committing: " << convert(xid)); try { bool result = getWork(xid)->commit(onePhase); remove(xid); @@ -79,7 +80,7 @@ bool DtxManager::commit(const std::string& xid, bool onePhase) void DtxManager::rollback(const std::string& xid) { - QPID_LOG(debug, "rolling back: " << xid); + QPID_LOG(debug, "rolling back: " << convert(xid)); try { getWork(xid)->rollback(); remove(xid); @@ -94,7 +95,7 @@ DtxWorkRecord* DtxManager::getWork(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); + throw NotFoundException(QPID_MSG("Unrecognised xid " << convert(xid))); } return ptr_map_ptr(i); } @@ -109,7 +110,7 @@ void DtxManager::remove(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); + throw NotFoundException(QPID_MSG("Unrecognised xid " << convert(xid))); } else { work.erase(i); } @@ -120,7 +121,7 @@ DtxWorkRecord* DtxManager::createWork(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i != work.end()) { - throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); + throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)")); } else { std::string ncxid = xid; // Work around const correctness problems in ptr_map. return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first); @@ -175,3 +176,19 @@ void DtxManager::setStore (TransactionalStore* _store) { store = _store; } + +std::string DtxManager::convert(const qpid::framing::Xid& xid) +{ + qpid::framing::StructHelper helper; + std::string encoded; + helper.encode(xid, encoded); + return encoded; +} + +qpid::framing::Xid DtxManager::convert(const std::string& xid) +{ + qpid::framing::StructHelper helper; + qpid::framing::Xid decoded; + helper.decode(decoded, xid); + return decoded; +} diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index 11895695a3..6f03189f66 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -26,6 +26,7 @@ #include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/Xid.h" #include "qpid/sys/Mutex.h" #include "qpid/ptr_map.h" @@ -74,6 +75,8 @@ public: } DtxWorkRecord* getWork(const std::string& xid); bool exists(const std::string& xid); + static std::string convert(const framing::Xid& xid); + static framing::Xid convert(const std::string& xid); }; } diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index a413fe418d..2c26fec49f 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/DtxWorkRecord.h" +#include "qpid/broker/DtxManager.h" #include "qpid/framing/reply_exceptions.h" #include #include @@ -73,7 +74,7 @@ bool DtxWorkRecord::commit(bool onePhase) if (prepared) { //already prepared i.e. 2pc if (onePhase) { - throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been prepared, one-phase option not valid!")); } store->commit(*txn); @@ -84,7 +85,7 @@ bool DtxWorkRecord::commit(bool onePhase) } else { //1pc commit optimisation, don't need a 2pc transaction context: if (!onePhase) { - throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has not been prepared, one-phase option required!")); } std::auto_ptr localtxn = store->begin(); if (prepare(localtxn.get())) { @@ -116,10 +117,10 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) { Mutex::ScopedLock locker(lock); if (expired) { - throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out.")); + throw DtxTimeoutException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has timed out.")); } if (completed) { - throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!")); + throw CommandInvalidException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been completed!")); } work.push_back(ops); } @@ -133,7 +134,7 @@ bool DtxWorkRecord::check() //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { if (!(*i)->isEnded()) { - throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " not completed!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " not completed!")); } else if ((*i)->isRollbackOnly()) { rolledback = true; } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index ecaa492903..8d20b0df81 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -32,7 +32,9 @@ #include "qpid/sys/ExceptionHolder.h" #include -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; @@ -135,20 +137,23 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); + qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics(); + uint64_t contentSize = msg.contentSize(); + + eStats->msgReceives += 1; + eStats->byteReceives += contentSize; if (count == 0) { //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); + eStats->msgDrops += 1; + eStats->byteDrops += contentSize; if (brokerMgmtObject) brokerMgmtObject->inc_discardsNoRoute(); } else { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + eStats->msgRoutes += count; + eStats->byteRoutes += count * contentSize; } } } @@ -156,7 +161,7 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); - route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders()); + route(dmsg); } } @@ -399,9 +404,12 @@ void Exchange::setProperties(const boost::intrusive_ptr& msg) { bool Exchange::routeWithAlternate(Deliverable& msg) { - route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + route(msg); if (!msg.delivered && alternate) { - alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + alternate->route(msg); } return msg.delivered; } + +}} + diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 9179dd5c7c..7376f814ed 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -196,7 +196,7 @@ public: virtual bool unbind(boost::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool isBound(boost::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr&); - virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void route(Deliverable& msg) = 0; //PersistableExchange: QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index fca77f7ddd..43d7268dfb 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" +#include "qpid/broker/Link.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/framing/reply_exceptions.h" @@ -58,6 +59,8 @@ pair ExchangeRegistry::declare(const string& name, c exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); }else if (type == ManagementTopicExchange::typeName) { exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); + }else if (type == Link::exchangeTypeName) { + exchange = Link::linkExchangeFactory(name); }else{ FunctionMap::iterator i = factory.find(type); if (i == factory.end()) { diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index 90ef81b49e..27b705fbe5 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -54,7 +54,7 @@ class ExchangeRegistry{ bool durable, const qpid::framing::FieldTable& args = framing::FieldTable()); QPID_BROKER_EXTERN void destroy(const std::string& name); - Exchange::shared_ptr getDefault(); + QPID_BROKER_EXTERN Exchange::shared_ptr getDefault(); /** * Find the named exchange. Return 0 if not found. @@ -75,7 +75,7 @@ class ExchangeRegistry{ /** Register an exchange instance. *@return true if registered, false if exchange with same name is already registered. */ - bool registerExchange(const Exchange::shared_ptr&); + QPID_BROKER_EXTERN bool registerExchange(const Exchange::shared_ptr&); QPID_BROKER_EXTERN void registerType(const std::string& type, FactoryFunction); @@ -85,7 +85,7 @@ class ExchangeRegistry{ for (ExchangeMap::const_iterator i = exchanges.begin(); i != exchanges.end(); ++i) f(i->second); } - + private: typedef std::map ExchangeMap; typedef std::map FunctionMap; diff --git a/cpp/src/qpid/broker/Fairshare.cpp b/cpp/src/qpid/broker/Fairshare.cpp index 313aa746f1..7cdad1a44f 100644 --- a/cpp/src/qpid/broker/Fairshare.cpp +++ b/cpp/src/qpid/broker/Fairshare.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/Fairshare.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include #include diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 5879fa0892..2bce99b6fe 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -101,7 +101,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons return true; } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) +void FanOutExchange::route(Deliverable& msg) { PreRoute pr(msg, this); doRoute(msg, bindings.snapshot()); diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 1a7d486796..c979fdca25 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -54,9 +54,7 @@ class FanOutExchange : public virtual Exchange { virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 142c23f276..6648ae0422 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -191,8 +191,9 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, } -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args) +void HeadersExchange::route(Deliverable& msg) { + const FieldTable* args = msg.getMessage().getApplicationHeaders(); if (!args) { //can't match if there were no headers passed in if (mgmtExchange != 0) { diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 3b939d6851..d10892b9cc 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -98,9 +98,7 @@ class HeadersExchange : public virtual Exchange { virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, diff --git a/cpp/src/qpid/broker/LegacyLVQ.cpp b/cpp/src/qpid/broker/LegacyLVQ.cpp index 49c0a32c19..f1deddf4c8 100644 --- a/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -28,16 +28,26 @@ namespace broker { LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {} void LegacyLVQ::setNoBrowse(bool b) -{ +{ noBrowse = b; } +bool LegacyLVQ::deleted(const QueuedMessage& message) +{ + Ordering::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.payload == message.payload) { + erase(i); + return true; + } else { + return false; + } +} bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end() && i->second.payload == message.payload) { + if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; message = i->second; - erase(i); return true; } else { return false; @@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed) } const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update) -{ +{ //add the new message into the original position of the replaced message Ordering::iterator i = messages.find(original.position); - i->second = update; - i->second.position = original.position; - return i->second; + if (i != messages.end()) { + i->second = update; + i->second.position = original.position; + return i->second; + } else { + QPID_LOG(error, "Failed to replace message at " << original.position); + return update; + } } void LegacyLVQ::removeIf(Predicate p) diff --git a/cpp/src/qpid/broker/LegacyLVQ.h b/cpp/src/qpid/broker/LegacyLVQ.h index 695e51131d..9355069f37 100644 --- a/cpp/src/qpid/broker/LegacyLVQ.h +++ b/cpp/src/qpid/broker/LegacyLVQ.h @@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap { public: LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); + bool deleted(const QueuedMessage&); bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool push(const QueuedMessage& added, QueuedMessage& removed); diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 4af1e6d6bd..f21c861149 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -31,6 +31,8 @@ #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/Exchange.h" +#include "qpid/UrlArray.h" namespace qpid { namespace broker { @@ -48,6 +50,13 @@ using std::stringstream; using std::string; namespace _qmf = ::qmf::org::apache::qpid::broker; + +namespace { + const std::string FAILOVER_EXCHANGE("amq.failover"); + const std::string FAILOVER_HEADER_KEY("amq.failover"); +} + + struct LinkTimerTask : public sys::TimerTask { LinkTimerTask(Link& l, sys::Timer& t) : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval* @@ -65,6 +74,57 @@ struct LinkTimerTask : public sys::TimerTask { sys::Timer& timer; }; + + +/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange. + */ +class LinkExchange : public broker::Exchange +{ +public: + LinkExchange(const std::string& name) : Exchange(name), link(0) {} + ~LinkExchange() {}; + std::string getType() const { return Link::exchangeTypeName; } + + // Exchange methods - set up to prevent binding/unbinding etc from clients! + bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*) { return false; } + bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*) { return false; } + bool isBound(boost::shared_ptr, const std::string* const, const framing::FieldTable* const) {return false;} + + // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs + // and saving them should the Link need to reconnect. + void route(broker::Deliverable& msg) + { + if (!link) return; + const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); + framing::Array addresses; + if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) { + // convert the Array of addresses to a single Url container for used with setUrl(): + std::vector urlVec; + Url urls; + urlVec = urlArrayToVector(addresses); + for(size_t i = 0; i < urlVec.size(); ++i) + urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end()); + QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls); + link->setUrl(urls); + } + } + + void setLink(Link *_link) + { + assert(!link); + link = _link; + } + +private: + Link *link; +}; + + +boost::shared_ptr Link::linkExchangeFactory( const std::string& _name ) +{ + return Exchange::shared_ptr(new LinkExchange(_name)); +} + Link::Link(LinkRegistry* _links, MessageStore* _store, const string& _host, @@ -76,8 +136,9 @@ Link::Link(LinkRegistry* _links, const string& _password, Broker* _broker, Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), - transport(_transport), + : links(_links), store(_store), + configuredTransport(_transport), configuredHost(_host), configuredPort(_port), + host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), mgmtObject(0), broker(_broker), state(0), @@ -88,7 +149,8 @@ Link::Link(LinkRegistry* _links, channelCounter(1), connection(0), agent(0), - timerTask(new LinkTimerTask(*this, broker->getTimer())) + timerTask(new LinkTimerTask(*this, broker->getTimer())), + failoverChannel(0) { if (parent != 0 && broker != 0) { @@ -106,15 +168,26 @@ Link::Link(LinkRegistry* _links, startConnectionLH(); } broker->getTimer().add(timerTask); + + stringstream _name; + _name << "qpid.link." << transport << ":" << host << ":" << port; + std::pair rc = broker->getExchanges().declare(_name.str(), + exchangeTypeName); + failoverExchange = boost::static_pointer_cast(rc.first); + assert(failoverExchange); + failoverExchange->setLink(this); } Link::~Link () { - if (state == STATE_OPERATIONAL && connection != 0) - connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); + if (state == STATE_OPERATIONAL && connection != 0) { + closeConnection("closed by management"); + } if (mgmtObject != 0) mgmtObject->resourceDestroy (); + + broker->getExchanges().destroy(failoverExchange->getName()); } void Link::setStateLH (int newState) @@ -180,11 +253,21 @@ void Link::established(Connection* c) void Link::setUrl(const Url& u) { + QPID_LOG(info, "Setting remote broker failover addresses for link '" << getName() << "' to these urls: " << u); Mutex::ScopedLock mutex(lock); url = u; reconnectNext = 0; } + +namespace { + /** invoked when session used to subscribe to remote's amq.failover exchange detaches */ + void sessionDetached(Link *link) { + QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName()); + } +} + + void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; @@ -198,37 +281,74 @@ void Link::opened() { reconnectNext = 0; QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } + + // + // attempt to subscribe to failover exchange for updates from remote + // + + const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + failoverChannel = nextChannel(); + + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) ); + failoverSession = queueName; + sessionHandler.attachAs(failoverSession); + + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + + remoteBroker.getQueue().declare(queueName, + "", // alt-exchange + false, // passive + false, // durable + true, // exclusive + true, // auto-delete + FieldTable()); + remoteBroker.getExchange().bind(queueName, + FAILOVER_EXCHANGE, + "", // no key + FieldTable()); + remoteBroker.getMessage().subscribe(queueName, + failoverExchange->getName(), + 1, // implied-accept mode + 0, // pre-acquire mode + false, // exclusive + "", // resume-id + 0, // resume-ttl + FieldTable()); + remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); + remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); } void Link::closed(int, std::string text) { - Mutex::ScopedLock mutex(lock); - QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); - - connection = 0; + bool isClosing = false; + { + Mutex::ScopedLock mutex(lock); + QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); - if (state == STATE_OPERATIONAL) { - stringstream addr; - addr << host << ":" << port; - QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str()); - if (!hideManagement() && agent) - agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); - } + connection = 0; + if (state == STATE_OPERATIONAL) { + stringstream addr; + addr << host << ":" << port; + if (!hideManagement() && agent) + agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); + } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - (*i)->closed(); - created.push_back(*i); - } - active.clear(); + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); + created.push_back(*i); + } + active.clear(); - if (state != STATE_FAILED && state != STATE_PASSIVE) - { - setStateLH(STATE_WAITING); - if (!hideManagement()) - mgmtObject->set_lastError (text); + if (state != STATE_FAILED && state != STATE_PASSIVE) + { + setStateLH(STATE_WAITING); + if (!hideManagement()) + mgmtObject->set_lastError (text); + } } - - if (closing) + // Call destroy outside of the lock, don't want to be deleted with lock held. + if (isClosing) destroy(); } @@ -239,10 +359,8 @@ void Link::destroy () { Mutex::ScopedLock mutex(lock); - QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); - if (connection) - connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); - connection = 0; + QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management"); + closeConnection("closed by management"); setStateLH(STATE_CLOSED); // Move the bridges to be deleted into a local vector so there is no @@ -263,7 +381,7 @@ void Link::destroy () for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) (*i)->destroy(); toDelete.clear(); - links->destroy (host, port); + links->destroy (configuredHost, configuredPort); } void Link::add(Bridge::shared_ptr bridge) @@ -311,7 +429,7 @@ void Link::ioThreadProcessing() // check for bridge session errors and recover if (!active.empty()) { Bridges::iterator removed = std::remove_if( - active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1)); + active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1)); for (Bridges::iterator i = removed; i != active.end(); ++i) { Bridge::shared_ptr bridge = *i; bridge->closed(); @@ -398,14 +516,14 @@ bool Link::hideManagement() const { uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); - + if (channelCounter >= framing::CHANNEL_MAX) + channelCounter = 1; return channelCounter++; } void Link::notifyConnectionForced(const string text) { Mutex::ScopedLock mutex(lock); - setStateLH(STATE_FAILED); if (!hideManagement()) mgmtObject->set_lastError(text); @@ -418,7 +536,7 @@ void Link::setPersistenceId(uint64_t id) const const string& Link::getName() const { - return host; + return configuredHost; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) @@ -444,9 +562,9 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) void Link::encode(Buffer& buffer) const { buffer.putShortString(string("link")); - buffer.putShortString(host); - buffer.putShort(port); - buffer.putShortString(transport); + buffer.putShortString(configuredHost); + buffer.putShort(configuredPort); + buffer.putShortString(configuredTransport); buffer.putOctet(durable ? 1 : 0); buffer.putShortString(authMechanism); buffer.putShortString(username); @@ -455,10 +573,10 @@ void Link::encode(Buffer& buffer) const uint32_t Link::encodedSize() const { - return host.size() + 1 // short-string (host) + return configuredHost.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port - + transport.size() + 1 // short-string(transport) + + configuredTransport.size() + 1 // short-string(transport) + 1 // durable + authMechanism.size() + 1 + username.size() + 1 @@ -513,7 +631,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te } std::pair result = - links->declare (host, port, iargs.i_durable, iargs.i_src, + links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, iargs.i_dynamic, iargs.i_sync); @@ -542,4 +660,63 @@ void Link::setPassive(bool passive) } } + +/** utility to clean up connection resources correctly */ +void Link::closeConnection( const std::string& reason) +{ + if (connection != 0) { + // cancel our subscription to the failover exchange + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + if (sessionHandler.getSession()) { + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + remoteBroker.getMessage().cancel(failoverExchange->getName()); + remoteBroker.getSession().detach(failoverSession); + } + connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); + connection = 0; + } +} + +/** returns the current remote's address, and connection state */ +bool Link::getRemoteAddress(qpid::Address& addr) const +{ + addr.protocol = transport; + addr.host = host; + addr.port = port; + + return state == STATE_OPERATIONAL; +} + + +// FieldTable keys for internal state data +namespace { + const std::string FAILOVER_ADDRESSES("failover-addresses"); + const std::string FAILOVER_INDEX("failover-index"); +} + +void Link::getState(framing::FieldTable& state) const +{ + state.clear(); + Mutex::ScopedLock mutex(lock); + if (!url.empty()) { + state.setString(FAILOVER_ADDRESSES, url.str()); + state.setInt(FAILOVER_INDEX, reconnectNext); + } +} + +void Link::setState(const framing::FieldTable& state) +{ + Mutex::ScopedLock mutex(lock); + if (state.isSet(FAILOVER_ADDRESSES)) { + Url failovers(state.getAsString(FAILOVER_ADDRESSES)); + setUrl(failovers); + } + if (state.isSet(FAILOVER_INDEX)) { + reconnectNext = state.getAsInt(FAILOVER_INDEX); + } +} + + +const std::string Link::exchangeTypeName("qpid.LinkExchange"); + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index 4085c3bfcf..a97fa48664 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -24,9 +24,11 @@ #include #include "qpid/Url.h" +#include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableConfig.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/BrokerImportExport.h" #include "qpid/sys/Mutex.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" @@ -45,15 +47,23 @@ namespace broker { class LinkRegistry; class Broker; class Connection; +class LinkExchange; class Link : public PersistableConfig, public management::Manageable { private: - sys::Mutex lock; + mutable sys::Mutex lock; LinkRegistry* links; MessageStore* store; - std::string host; - uint16_t port; - std::string transport; + + // these remain constant across failover - used to identify this link + const std::string configuredTransport; + const std::string configuredHost; + const uint16_t configuredPort; + // these reflect the current address of remote - will change during failover + std::string host; + uint16_t port; + std::string transport; + bool durable; std::string authMechanism; std::string username; @@ -75,8 +85,10 @@ class Link : public PersistableConfig, public management::Manageable { uint channelCounter; Connection* connection; management::ManagementAgent* agent; - boost::intrusive_ptr timerTask; + boost::shared_ptr failoverExchange; // subscribed to remote's amq.failover exchange + uint failoverChannel; + std::string failoverSession; static const int STATE_WAITING = 1; static const int STATE_CONNECTING = 2; @@ -94,6 +106,14 @@ class Link : public PersistableConfig, public management::Manageable { bool tryFailoverLH(); // Called during maintenance visit bool hideManagement() const; + void established(Connection*); // Called when connection is create + void opened(); // Called when connection is open (after create) + void closed(int, std::string); // Called when connection goes away + void reconnectLH(const Address&); //called by LinkRegistry + void closeConnection(const std::string& reason); + + friend class LinkRegistry; // to call established, opened, closed + public: typedef boost::shared_ptr shared_ptr; @@ -110,22 +130,25 @@ class Link : public PersistableConfig, public management::Manageable { management::Manageable* parent = 0); virtual ~Link(); - std::string getHost() { return host; } - uint16_t getPort() { return port; } - std::string getTransport() { return transport; } + /** these return the *configured* transport/host/port, which does not change over the + lifetime of the Link */ + std::string getHost() const { return configuredHost; } + uint16_t getPort() const { return configuredPort; } + std::string getTransport() const { return configuredTransport; } + + /** returns the current address of the remote, which may be different from the + configured transport/host/port due to failover. Returns true if connection is + active */ + bool getRemoteAddress(qpid::Address& addr) const; bool isDurable() { return durable; } void maintenanceVisit (); uint nextChannel(); void add(Bridge::shared_ptr); void cancel(Bridge::shared_ptr); - void setUrl(const Url&); // Set URL for reconnection. - void established(Connection*); // Called when connection is create - void opened(); // Called when connection is open (after create) - void closed(int, std::string); // Called when connection goes away - void reconnectLH(const Address&); //called by LinkRegistry - void close(); // Close the link from within the broker. + QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection. + QPID_BROKER_EXTERN void close(); // Close the link from within the broker. std::string getAuthMechanism() { return authMechanism; } std::string getUsername() { return username; } @@ -148,6 +171,13 @@ class Link : public PersistableConfig, public management::Manageable { management::ManagementObject* GetManagementObject(void) const; management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); + // manage the exchange owned by this link + static const std::string exchangeTypeName; + static boost::shared_ptr linkExchangeFactory(const std::string& name); + + // replicate internal state of this Link for clustering + void getState(framing::FieldTable& state) const; + void setState(const framing::FieldTable& state); }; } } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index bb602bb953..d89f220d1b 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -25,7 +25,9 @@ #include #include -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::sys; using std::string; using std::pair; @@ -45,16 +47,15 @@ LinkRegistry::LinkRegistry () : { } -namespace { -struct ConnectionObserverImpl : public ConnectionObserver { +class LinkRegistryConnectionObserver : public ConnectionObserver { LinkRegistry& links; - ConnectionObserverImpl(LinkRegistry& l) : links(l) {} + public: + LinkRegistryConnectionObserver(LinkRegistry& l) : links(l) {} void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); } void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); } void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); } void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); } }; -} LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), @@ -62,7 +63,7 @@ LinkRegistry::LinkRegistry (Broker* _broker) : realm(broker->getOptions().realm) { broker->getConnectionObservers().add( - boost::shared_ptr(new ConnectionObserverImpl(*this))); + boost::shared_ptr(new LinkRegistryConnectionObserver(*this))); } LinkRegistry::~LinkRegistry() {} @@ -298,22 +299,29 @@ std::string LinkRegistry::getUsername(const std::string& key) return link->getUsername(); } +/** note: returns the current remote host (may be different from the host originally + configured for the Link due to failover) */ std::string LinkRegistry::getHost(const std::string& key) { - Link::shared_ptr link = findLink(key); - if (!link) - return string(); + Link::shared_ptr link = findLink(key); + if (!link) + return string(); - return link->getHost(); + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.host; } +/** returns the current remote port (ditto above) */ uint16_t LinkRegistry::getPort(const std::string& key) { Link::shared_ptr link = findLink(key); if (!link) return 0; - return link->getPort(); + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.port; } std::string LinkRegistry::getPassword(const std::string& key) @@ -368,3 +376,4 @@ void LinkRegistry::eachBridge(boost::function)> f for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); } +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 753f6bfe9e..8e9d2f4b0d 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -23,6 +23,7 @@ */ #include +#include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/MessageStore.h" #include "qpid/Address.h" @@ -56,43 +57,50 @@ namespace broker { static std::string createKey(const Address& address); static std::string createKey(const std::string& host, uint16_t port); + // Methods called by the connection observer. + void notifyConnection (const std::string& key, Connection* c); + void notifyOpened (const std::string& key); + void notifyClosed (const std::string& key); + void notifyConnectionForced (const std::string& key, const std::string& text); + friend class LinkRegistryConnectionObserver; + public: - LinkRegistry (); // Only used in store tests - LinkRegistry (Broker* _broker); - ~LinkRegistry(); - - std::pair, bool> - declare(const std::string& host, - uint16_t port, - const std::string& transport, - bool durable, - const std::string& authMechanism, - const std::string& username, - const std::string& password); - - std::pair - declare(const std::string& host, - uint16_t port, - bool durable, - const std::string& src, - const std::string& dest, - const std::string& key, - bool isQueue, - bool isLocal, - const std::string& id, - const std::string& excludes, - bool dynamic, - uint16_t sync, - Bridge::InitializeCallback=0 - ); - - void destroy(const std::string& host, const uint16_t port); - - void destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key); + QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests + QPID_BROKER_EXTERN LinkRegistry (Broker* _broker); + QPID_BROKER_EXTERN ~LinkRegistry(); + + QPID_BROKER_EXTERN std::pair, bool> + declare(const std::string& host, + uint16_t port, + const std::string& transport, + bool durable, + const std::string& authMechanism, + const std::string& username, + const std::string& password); + + QPID_BROKER_EXTERN std::pair + declare(const std::string& host, + uint16_t port, + bool durable, + const std::string& src, + const std::string& dest, + const std::string& key, + bool isQueue, + bool isLocal, + const std::string& id, + const std::string& excludes, + bool dynamic, + uint16_t sync, + Bridge::InitializeCallback=0 + ); + + QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port); + + QPID_BROKER_EXTERN void destroy(const std::string& host, + const uint16_t port, + const std::string& src, + const std::string& dest, + const std::string& key); /** * Register the manageable parent for declared queues @@ -102,24 +110,20 @@ namespace broker { /** * Set the store to use. May only be called once. */ - void setStore (MessageStore*); + QPID_BROKER_EXTERN void setStore (MessageStore*); /** * Return the message store used. */ - MessageStore* getStore() const; + QPID_BROKER_EXTERN MessageStore* getStore() const; - void notifyConnection (const std::string& key, Connection* c); - void notifyOpened (const std::string& key); - void notifyClosed (const std::string& key); - void notifyConnectionForced (const std::string& key, const std::string& text); - std::string getAuthMechanism (const std::string& key); - std::string getAuthCredentials (const std::string& key); - std::string getAuthIdentity (const std::string& key); - std::string getUsername (const std::string& key); - std::string getPassword (const std::string& key); - std::string getHost (const std::string& key); - uint16_t getPort (const std::string& key); + QPID_BROKER_EXTERN std::string getAuthMechanism (const std::string& key); + QPID_BROKER_EXTERN std::string getAuthCredentials (const std::string& key); + QPID_BROKER_EXTERN std::string getAuthIdentity (const std::string& key); + QPID_BROKER_EXTERN std::string getUsername (const std::string& key); + QPID_BROKER_EXTERN std::string getPassword (const std::string& key); + QPID_BROKER_EXTERN std::string getHost (const std::string& key); + QPID_BROKER_EXTERN uint16_t getPort (const std::string& key); /** * Called by links failing over to new address @@ -132,13 +136,13 @@ namespace broker { * updated but links won't actually establish connections and * bridges won't therefore pull or push any messages. */ - void setPassive(bool); - bool isPassive() { return passive; } + QPID_BROKER_EXTERN void setPassive(bool); + QPID_BROKER_EXTERN bool isPassive() { return passive; } /** Iterate over each link in the registry. Used for cluster updates. */ - void eachLink(boost::function)> f); + QPID_BROKER_EXTERN void eachLink(boost::function)> f); /** Iterate over each bridge in the registry. Used for cluster updates. */ - void eachBridge(boost::function)> f); + QPID_BROKER_EXTERN void eachBridge(boost::function)> f); }; } } diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index ae4503328a..40dfba39f4 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -131,6 +131,7 @@ uint32_t Message::getRequiredCredit() void Message::encode(framing::Buffer& buffer) const { + sys::Mutex::ScopedLock l(lock); //encode method and header frames EncodeFrame f1(buffer); frames.map_if(f1, TypeFilter2()); @@ -142,6 +143,7 @@ void Message::encode(framing::Buffer& buffer) const void Message::encodeContent(framing::Buffer& buffer) const { + sys::Mutex::ScopedLock l(lock); //encode the payload of each content frame EncodeBody f2(buffer); frames.map_if(f2, TypeFilter()); @@ -154,11 +156,13 @@ uint32_t Message::encodedSize() const uint32_t Message::encodedContentSize() const { + sys::Mutex::ScopedLock l(lock); return frames.getContentSize(); } uint32_t Message::encodedHeaderSize() const { + sys::Mutex::ScopedLock l(lock); // prevent modifications while computing size //add up the size for all method and header frames in the frameset SumFrameSize sum; frames.map_if(sum, TypeFilter2()); @@ -218,8 +222,9 @@ void Message::releaseContent() store->stage(pmsg); staged = true; } - //ensure required credit is cached before content frames are released + //ensure required credit and size is cached before content frames are released getRequiredCredit(); + contentSize(); //remove any content frames from the frameset frames.remove(TypeFilter()); setContentReleased(); @@ -354,6 +359,7 @@ public: AMQHeaderBody* Message::getHeaderBody() { + // expects lock to be held if (copyHeaderOnWrite) { CloneHeaderBody f; frames.map_if(f, TypeFilter()); diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index 709d99876b..f70c996975 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/MessageDeque.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" +#include "assert.h" namespace qpid { namespace broker { @@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing::SequenceNumber& position) bool MessageDeque::deleted(const QueuedMessage& m) { size_t i = index(m.position); - if (i < messages.size()) { + if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) { messages[i].status = QueuedMessage::DELETED; clean(); return true; @@ -53,7 +54,7 @@ size_t MessageDeque::size() return available; } -void MessageDeque::release(const QueuedMessage& message) +QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message) { size_t i = index(message.position); if (i < messages.size()) { @@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedMessage& message) if (head > i) head = i; m.status = QueuedMessage::AVAILABLE; ++available; + return &messages[i]; } } else { + assert(0); QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")"); } + return 0; } +void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); } + bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { if (position < messages.front().position) return false; @@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) { } } // namespace -bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) -{ +QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) { //add padding to prevent gaps in sequence, which break the index //calculation (needed for queue replication) while (messages.size() && (added.position - messages.back().position) > 1) @@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed* messages.back().status = QueuedMessage::AVAILABLE; if (head >= messages.size()) head = messages.size() - 1; ++available; - return false;//adding a message never causes one to be removed for deque + return &messages.back(); +} + +bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { + pushPtr(added); + return false; // adding a message never causes one to be removed for deque } void MessageDeque::updateAcquired(const QueuedMessage& acquired) diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h index bb5943b09b..9b53716d4e 100644 --- a/cpp/src/qpid/broker/MessageDeque.h +++ b/cpp/src/qpid/broker/MessageDeque.h @@ -48,6 +48,12 @@ class MessageDeque : public Messages void foreach(Functor); void removeIf(Predicate); + // For use by other Messages implementations that use MessageDeque as a FIFO index + // and keep pointers to its elements in their own indexing strctures. + void clean(); + QueuedMessage* releasePtr(const QueuedMessage&); + QueuedMessage* pushPtr(const QueuedMessage& added); + private: typedef std::deque Deque; Deque messages; @@ -55,7 +61,6 @@ class MessageDeque : public Messages size_t head; size_t index(const framing::SequenceNumber&); - void clean(); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp index 5f450cd556..15cd56a676 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -19,11 +19,13 @@ * */ +#include "qpid/broker/MessageGroupManager.h" + +#include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" -#include "qpid/types/Variant.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/MessageGroupManager.h" +#include "qpid/types/Variant.h" using namespace qpid::broker; @@ -43,9 +45,18 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +/** return an iterator to the message at position, or members.end() if not found */ +MessageGroupManager::GroupState::MessageFifo::iterator +MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position) +{ + MessageState mState(position); + MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState); + return (found->position == position) ? found : members.end(); +} + void MessageGroupManager::unFree( const GroupState& state ) { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + GroupFifo::iterator pos = freeGroups.find( state.members.front().position ); assert( pos != freeGroups.end() && pos->second == &state ); freeGroups.erase( pos ); } @@ -60,8 +71,8 @@ void MessageGroupManager::disown( GroupState& state ) { state.owner.clear(); assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; + assert(freeGroups.find(state.members.front().position) == freeGroups.end()); + freeGroups[state.members.front().position] = &state; } MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm ) @@ -106,7 +117,8 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm ) // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - state.members.push_back(qm.position); + GroupState::MessageState mState(qm.position); + state.members.push_back(mState); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << ": added message to group id=" << state.group << " total=" << total ); @@ -123,7 +135,9 @@ void MessageGroupManager::acquired( const QueuedMessage& qm ) // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - assert(state.members.size()); // there are msgs present + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + m->acquired = true; state.acquired += 1; QPID_LOG( trace, "group queue " << qName << ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); @@ -137,6 +151,9 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) GroupState& state = findGroup(qm); assert( state.acquired != 0 ); state.acquired -= 1; + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + m->acquired = false; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << state.group); @@ -152,13 +169,17 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - assert( state.members.size() != 0 ); - assert( state.acquired != 0 ); - state.acquired -= 1; + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + if (m->acquired) { + assert( state.acquired != 0 ); + state.acquired -= 1; + } - // likely to be at or near begin() if dequeued in order + // special case if qm is first (oldest) message in the group: + // may need to re-insert it back on the freeGroups list, as the index will change bool reFreeNeeded = false; - if (state.members.front() == qm.position) { + if (m == state.members.begin()) { if (!state.owned()) { // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! // if on freelist, it is indexed by first member, which is about to be removed! @@ -167,15 +188,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) } state.members.pop_front(); } else { - GroupState::PositionFifo::iterator pos = state.members.begin() + 1; - GroupState::PositionFifo::iterator end = state.members.end(); - while (pos != end) { - if (*pos == qm.position) { - state.members.erase(pos); - break; - } - ++pos; - } + state.members.erase(m); } uint32_t total = state.members.size(); @@ -220,11 +233,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued GroupState& group = findGroup(next); if (!group.owned()) { //TODO: make acquire more efficient when we already have the message in question - if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head! + if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head! return true; } QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group - << "'s head message still pending. pos=" << group.members.front()); + << "'s head message still pending. pos=" << group.members.front().position); } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { return true; } @@ -284,7 +297,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const info[GROUP_TIMESTAMP] = 0; if (g->second.members.size() != 0) { QueuedMessage qm; - if (messages.find(g->second.members.front(), qm) && + if (messages.find(g->second.members.front().position, qm) && qm.payload && qm.payload->hasProperties()) { info[GROUP_TIMESTAMP] = qm.payload->getProperties()->getTimestamp(); @@ -353,6 +366,7 @@ namespace { const std::string GROUP_OWNER("owner"); const std::string GROUP_ACQUIRED_CT("acquired-ct"); const std::string GROUP_POSITIONS("positions"); + const std::string GROUP_ACQUIRED_MSGS("acquired-msgs"); const std::string GROUP_STATE("group-state"); } @@ -371,10 +385,14 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const group.setString(GROUP_OWNER, g->second.owner); group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); framing::Array positions(TYPE_CODE_UINT32); - for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); - p != g->second.members.end(); ++p) - positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); + for (GroupState::MessageFifo::const_iterator p = g->second.members.begin(); + p != g->second.members.end(); ++p) { + positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position ))); + acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired ))); + } group.setArray(GROUP_POSITIONS, positions); + group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); } state.setArray(GROUP_STATE, groupState); @@ -425,13 +443,25 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) qName << "\": position encoding error!"); return; } + framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); + ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); + if (!ok || positions.count() != acquiredMsgs.count()) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": acquired flag encoding error!"); + return; + } + + Array::const_iterator a = acquiredMsgs.begin(); + for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) { + GroupState::MessageState mState((*p)->getIntegerValue()); + mState.acquired = (*a++)->getIntegerValue(); + state.members.push_back(mState); + } - for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) - state.members.push_back((*p)->getIntegerValue()); messageGroups[state.group] = state; if (!state.owned()) { assert(state.members.size()); - freeGroups[state.members.front()] = &messageGroups[state.group]; + freeGroups[state.members.front().position] = &messageGroups[state.group]; } } diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h index f4bffc4760..2dd97ea2ff 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.h +++ b/cpp/src/qpid/broker/MessageGroupManager.h @@ -28,11 +28,14 @@ #include "qpid/broker/MessageDistributor.h" #include "qpid/sys/unordered_map.h" +#include + namespace qpid { namespace broker { class QueueObserver; class MessageDistributor; +class Messages; class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor { @@ -45,19 +48,29 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu struct GroupState { // note: update getState()/setState() when changing this object's state implementation - typedef std::deque PositionFifo; + + // track which messages are in this group, and if they have been acquired + struct MessageState { + qpid::framing::SequenceNumber position; + bool acquired; + MessageState() : acquired(false) {} + MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {} + bool operator<(const MessageState& b) const { return position < b.position; } + }; + typedef std::deque MessageFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - PositionFifo members; // msgs belonging to this group + MessageFifo members; // msgs belonging to this group, in enqueue order GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} + MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &); }; typedef sys::unordered_map GroupMap; - typedef std::map GroupFifo; + typedef std::map GroupFifo; GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp index 048df45434..9b164d4e5c 100644 --- a/cpp/src/qpid/broker/MessageMap.cpp +++ b/cpp/src/qpid/broker/MessageMap.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/MessageMap.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -27,7 +28,16 @@ namespace { const std::string EMPTY; } -bool MessageMap::deleted(const QueuedMessage&) { return true; } +bool MessageMap::deleted(const QueuedMessage& message) +{ + Ordering::iterator i = messages.find(message.position); + if (i != messages.end()) { + erase(i); + return true; + } else { + return false; + } +} std::string MessageMap::getKey(const QueuedMessage& message) { @@ -38,30 +48,32 @@ std::string MessageMap::getKey(const QueuedMessage& message) size_t MessageMap::size() { - return messages.size(); + size_t count(0); + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) ++count; + } + return count; } bool MessageMap::empty() { - return messages.empty(); + return size() == 0;//TODO: more efficient implementation } void MessageMap::release(const QueuedMessage& message) { - std::string key = getKey(message); - Index::iterator i = index.find(key); - if (i == index.end()) { - index[key] = message; - messages[message.position] = message; - } //else message has already been replaced + Ordering::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { + i->second.status = QueuedMessage::AVAILABLE; + } } bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; message = i->second; - erase(i); return true; } else { return false; @@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { message = i->second; return true; } else { @@ -79,10 +91,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me } } -bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) +bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { Ordering::iterator i = messages.lower_bound(position+1); - if (i != messages.end()) { + if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { message = i->second; return true; } else { @@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::consume(QueuedMessage& message) { - Ordering::iterator i = messages.begin(); - if (i != messages.end()) { - message = i->second; - erase(i); - return true; - } else { - return false; + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; + message = i->second; + return true; + } } + return false; } const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) @@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) if (result.second) { //there was no previous message for this key; nothing needs to //be removed, just add the message into its correct position - messages[added.position] = added; + QueuedMessage& a = messages[added.position]; + a = added; + a.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Added message at " << a.position); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); + result.first->second.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first); return true; } } @@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) void MessageMap::foreach(Functor f) { for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { - f(i->second); + if (i->second.status == QueuedMessage::AVAILABLE) f(i->second); } } void MessageMap::removeIf(Predicate p) { - for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) { - if (p(i->second)) { - erase(i); + for (Ordering::iterator i = messages.begin(); i != messages.end();) { + if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) { + index.erase(getKey(i->second)); + //Note: Removing from messages means that the subsequent + //call to deleted() for the same message will return + //false. At present that is not a problem. If this were + //changed to hold onto the message until dequeued + //(e.g. with REMOVED state), then the erase() below would + //need to take that into account. + messages.erase(i++); + } else { + ++i; } } } diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h index d1b8217f9b..a668450250 100644 --- a/cpp/src/qpid/broker/MessageMap.h +++ b/cpp/src/qpid/broker/MessageMap.h @@ -43,7 +43,7 @@ class MessageMap : public Messages size_t size(); bool empty(); - bool deleted(const QueuedMessage&); + virtual bool deleted(const QueuedMessage&); void release(const QueuedMessage&); virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp index d807ef22b1..ab5ec7235a 100644 --- a/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/cpp/src/qpid/broker/PriorityQueue.cpp @@ -3,13 +3,13 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownersip. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,96 +22,87 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" #include namespace qpid { namespace broker { -PriorityQueue::PriorityQueue(int l) : +PriorityQueue::PriorityQueue(int l) : levels(l), messages(levels, Deque()), frontLevel(0), haveFront(false), cached(false) {} -bool PriorityQueue::deleted(const QueuedMessage&) { return true; } +bool PriorityQueue::deleted(const QueuedMessage& qm) { + bool deleted = fifo.deleted(qm); + if (deleted) erase(qm); + return deleted; +} size_t PriorityQueue::size() { - size_t total(0); - for (int i = 0; i < levels; ++i) { - total += messages[i].size(); - } - return total; + return fifo.size(); +} + +namespace { +bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; } } void PriorityQueue::release(const QueuedMessage& message) { - uint p = getPriorityLevel(message); - messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); - clearCache(); + QueuedMessage* qm = fifo.releasePtr(message); + if (qm) { + uint p = getPriorityLevel(message); + messages[p].insert( + lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm); + clearCache(); + } } -bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) -{ - QueuedMessage comp; - comp.position = position; - for (int i = 0; i < levels; ++i) { - if (!messages[i].empty()) { - unsigned long diff = position.getValue() - messages[i].front().position.getValue(); - long maxEnd = diff < messages[i].size() ? diff : messages[i].size(); - Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp); - if (l != messages[i].end() && l->position == position) { - message = *l; - if (remove) { - messages[i].erase(l); - clearCache(); - } - return true; - } + +void PriorityQueue::erase(const QueuedMessage& qm) { + size_t i = getPriorityLevel(qm); + if (!messages[i].empty()) { + long diff = qm.position.getValue() - messages[i].front()->position.getValue(); + if (diff < 0) return; + long maxEnd = std::min(size_t(diff), messages[i].size()); + QueuedMessage mutableQm = qm; // need non-const qm for lower_bound + Deque::iterator l = + lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before); + if (l != messages[i].end() && (*l)->position == qm.position) { + messages[i].erase(l); + clearCache(); + return; } } - return false; } bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - return find(position, message, true); + bool acquired = fifo.acquire(position, message); + if (acquired) erase(message); // No longer available + return acquired; } bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) { - return find(position, message, false); + return fifo.find(position, message); } -bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) +bool PriorityQueue::browse( + const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - QueuedMessage match; - match.position = position+1; - Deque::iterator lowest; - bool found = false; - for (int i = 0; i < levels; ++i) { - Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); - if (m != messages[i].end()) { - if (m->position == match.position) { - message = *m; - return true; - } else if (!found || m->position < lowest->position) { - lowest = m; - found = true; - } - } - } - if (found) { - message = *lowest; - } - return found; + return fifo.browse(position, message, unacquired); } bool PriorityQueue::consume(QueuedMessage& message) { if (checkFront()) { - message = messages[frontLevel].front(); + QueuedMessage* pm = messages[frontLevel].front(); messages[frontLevel].pop_front(); clearCache(); + pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index + message = *pm; return true; } else { return false; @@ -120,23 +111,27 @@ bool PriorityQueue::consume(QueuedMessage& message) bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { - messages[getPriorityLevel(added)].push_back(added); + QueuedMessage* qmp = fifo.pushPtr(added); + messages[getPriorityLevel(added)].push_back(qmp); clearCache(); - return false;//adding a message never causes one to be removed for deque + return false; // Adding a message never causes one to be removed for deque +} + +void PriorityQueue::updateAcquired(const QueuedMessage& acquired) { + fifo.updateAcquired(acquired); } void PriorityQueue::foreach(Functor f) { - for (int i = 0; i < levels; ++i) { - std::for_each(messages[i].begin(), messages[i].end(), f); - } + fifo.foreach(f); } void PriorityQueue::removeIf(Predicate p) { for (int priority = 0; priority < levels; ++priority) { for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { - if (p(*i)) { + if (p(**i)) { + (*i)->status = QueuedMessage::DELETED; // Updates fifo index i = messages[priority].erase(i); clearCache(); } else { @@ -144,6 +139,7 @@ void PriorityQueue::removeIf(Predicate p) } } } + fifo.clean(); } uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h index 67c31468d2..8628745db1 100644 --- a/cpp/src/qpid/broker/PriorityQueue.h +++ b/cpp/src/qpid/broker/PriorityQueue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,7 @@ * under the License. * */ -#include "qpid/broker/Messages.h" +#include "qpid/broker/MessageDeque.h" #include "qpid/sys/IntegerTypes.h" #include #include @@ -32,7 +32,10 @@ namespace broker { /** * Basic priority queue with a configurable number of recognised * priority levels. This is implemented as a separate deque per - * priority level. Browsing is FIFO not priority order. + * priority level. + * + * Browsing is FIFO not priority order. There is a MessageDeque + * for fast browsing. */ class PriorityQueue : public Messages { @@ -48,23 +51,31 @@ class PriorityQueue : public Messages bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); - + void updateAcquired(const QueuedMessage& acquired); void foreach(Functor); void removeIf(Predicate); + static uint getPriority(const QueuedMessage&); + protected: - typedef std::deque Deque; + typedef std::deque Deque; typedef std::vector PriorityLevels; virtual bool findFrontLevel(uint& p, PriorityLevels&); const int levels; + private: + /** Available messages separated by priority and sorted in priority order. + * Holds pointers to the QueuedMessages in fifo + */ PriorityLevels messages; + /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */ + MessageDeque fifo; uint frontLevel; bool haveFront; bool cached; - - bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); + + void erase(const QueuedMessage&); uint getPriorityLevel(const QueuedMessage&) const; void clearCache(); bool checkFront(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 015957927f..e7305c021d 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -19,8 +19,9 @@ * */ -#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" + +#include "qpid/broker/Broker.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Fairshare.h" @@ -41,6 +42,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" @@ -56,7 +58,9 @@ #include -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; @@ -88,8 +92,57 @@ const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; + +inline void mgntEnqStats(const boost::intrusive_ptr& msg, + _qmf::Queue* mgmtObject, + _qmf::Broker* brokerMgmtObject) +{ + if (mgmtObject != 0) { + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + + uint64_t contentSize = msg->contentSize(); + qStats->msgTotalEnqueues +=1; + bStats->msgTotalEnqueues += 1; + qStats->byteTotalEnqueues += contentSize; + bStats->byteTotalEnqueues += contentSize; + if (msg->isPersistent ()) { + qStats->msgPersistEnqueues += 1; + bStats->msgPersistEnqueues += 1; + qStats->bytePersistEnqueues += contentSize; + bStats->bytePersistEnqueues += contentSize; + } + mgmtObject->statisticsUpdated(); + brokerMgmtObject->statisticsUpdated(); + } +} + +inline void mgntDeqStats(const boost::intrusive_ptr& msg, + _qmf::Queue* mgmtObject, + _qmf::Broker* brokerMgmtObject) +{ + if (mgmtObject != 0){ + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + uint64_t contentSize = msg->contentSize(); + + qStats->msgTotalDequeues += 1; + bStats->msgTotalDequeues += 1; + qStats->byteTotalDequeues += contentSize; + bStats->byteTotalDequeues += contentSize; + if (msg->isPersistent ()){ + qStats->msgPersistDequeues += 1; + bStats->msgPersistDequeues += 1; + qStats->bytePersistDequeues += contentSize; + bStats->bytePersistDequeues += contentSize; + } + mgmtObject->statisticsUpdated(); + brokerMgmtObject->statisticsUpdated(); + } } +} // namespace + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, @@ -101,6 +154,7 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), consumerCount(0), + browserCount(0), exclusive(0), noLocal(false), persistLastNode(false), @@ -166,7 +220,7 @@ void Queue::deliver(boost::intrusive_ptr msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); - alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); + alternateExchange->route(deliverable); } } else if (isLocal(msg)) { //drop message @@ -183,11 +237,16 @@ void Queue::deliver(boost::intrusive_ptr msg){ void Queue::recoverPrepared(boost::intrusive_ptr& msg) { + Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->recoverEnqueued(msg); } -void Queue::recover(boost::intrusive_ptr& msg){ - if (policy.get()) policy->recoverEnqueued(msg); +void Queue::recover(boost::intrusive_ptr& msg) +{ + { + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->recoverEnqueued(msg); + } push(msg, true); if (store){ @@ -209,11 +268,16 @@ void Queue::recover(boost::intrusive_ptr& msg){ void Queue::process(boost::intrusive_ptr& msg){ push(msg); if (mgmtObject != 0){ - mgmtObject->inc_msgTxnEnqueues (); - mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + const uint64_t contentSize = msg->contentSize(); + qStats->msgTxnEnqueues += 1; + qStats->byteTxnEnqueues += contentSize; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgTxnEnqueues (); - brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgTxnEnqueues += 1; + bStats->byteTxnEnqueues += contentSize; + brokerMgmtObject->statisticsUpdated(); } } } @@ -222,7 +286,6 @@ void Queue::requeue(const QueuedMessage& msg){ assertClusterSafe(); QueueListeners::NotificationSet copy; { - Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; if (deleted) { // @@ -238,10 +301,20 @@ void Queue::requeue(const QueuedMessage& msg){ if (brokerMgmtObject) brokerMgmtObject->inc_abandoned(); } - mgntDeqStats(msg.payload); + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); } else { - messages->release(msg); - listeners.populate(copy); + { + Mutex::ScopedLock locker(messageLock); + messages->release(msg); + observeRequeue(msg, locker); + listeners.populate(copy); + } + + if (mgmtObject) { + mgmtObject->inc_releases(); + if (brokerMgmtObject) + brokerMgmtObject->inc_releases(); + } // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { @@ -251,7 +324,6 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } - observeRequeue(msg, locker); } } copy.notify(); @@ -259,10 +331,9 @@ void Queue::requeue(const QueuedMessage& msg){ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (acquire(position, message, locker)) { + if (acquire(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -273,17 +344,20 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) { - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); - - if (!allocator->allocate( consumer, msg )) { + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = allocator->allocate( consumer, msg ); + } + if (!ok) { QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); return false; } QueuedMessage copy(msg); - if (acquire( msg.position, copy, locker)) { + if (acquire( msg.position, copy)) { QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); return true; } @@ -325,59 +399,73 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { - Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - if (allocator->nextConsumableMessage(c, msg)) { - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->setPosition(msg.position); - dequeue(0, msg); - if (mgmtObject) { - mgmtObject->inc_discardsTtl(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(); - } + bool found; + { + Mutex::ScopedLock locker(messageLock); + found = allocator->nextConsumableMessage(c, msg); + if (!found) listeners.addListener(c); + } + if (!found) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + return NO_MESSAGES; + } - continue; + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->setPosition(msg.position); + dequeue(0, msg); + if (mgmtObject) { + mgmtObject->inc_discardsTtl(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(); } + continue; + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + { + Mutex::ScopedLock locker(messageLock); bool ok = allocator->allocate( c->getName(), msg ); // inform allocator (void) ok; assert(ok); observeAcquire(msg, locker); - m = msg; - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; } + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + m = msg; + return CONSUMED; } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); } } else { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - listeners.addListener(c); - return NO_MESSAGES; + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); } + + Mutex::ScopedLock locker(messageLock); + messages->release(msg); + return CANT_CONSUME; } } bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { - Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - - if (!allocator->nextBrowsableMessage(c, msg)) { // no next available + bool found; + { + Mutex::ScopedLock locker(messageLock); + found = allocator->nextBrowsableMessage(c, msg); + if (!found) listeners.addListener(c); + } + if (!found) { // no next available QPID_LOG(debug, "No browsable messages available for consumer " << c->getName() << " on queue '" << name << "'"); - listeners.addListener(c); return false; } @@ -435,60 +523,67 @@ bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); { - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + Mutex::ScopedLock locker(messageLock); + // NOTE: consumerCount is actually a count of all + // subscriptions, both acquiring and non-acquiring (browsers). + // Check for exclusivity of acquiring consumers. + size_t acquiringConsumers = consumerCount - browserCount; + if (c->preAcquires()) { + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() + << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(acquiringConsumers) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() + << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } } } + else + browserCount++; consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); } + observeConsumerAdd(*c, locker); } - Mutex::ScopedLock locker(messageLock); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerAdded(*c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); - } - } + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); consumerCount--; + if (!c->preAcquires()) browserCount--; if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); - } - Mutex::ScopedLock locker(messageLock); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerRemoved(*c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); - } + observeConsumerRemove(*c, locker); } + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); } QueuedMessage Queue::get(){ - Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->consume(msg)) - observeAcquire(msg, locker); + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = messages->consume(msg); + if (ok) observeAcquire(msg, locker); + } + + if (ok && mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + return msg; } @@ -520,22 +615,26 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - // - // Report the count of discarded-by-ttl messages - // - if (mgmtObject && !expired.empty()) { - mgmtObject->inc_discardsTtl(expired.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(expired.size()); - } + if (!expired.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(expired.size()); + mgmtObject->inc_discardsTtl(expired.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(expired.size()); + brokerMgmtObject->inc_discardsTtl(expired.size()); + } + } - for (std::deque::const_iterator i = expired.begin(); - i != expired.end(); ++i) { - { - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); + for (std::deque::const_iterator i = expired.begin(); + i != expired.end(); ++i) { + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf() call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); } - dequeue( 0, *i ); } } } @@ -661,32 +760,46 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr std::auto_ptr mf(MessageFilter::create(filter)); Collector c(*mf.get(), purge_request); - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind(boost::ref(c), _1) ); + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind(boost::ref(c), _1) ); + } - if (mgmtObject && !c.matches.empty()) { - if (dest.get()) { - mgmtObject->inc_reroutes(c.matches.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_reroutes(c.matches.size()); - } else { - mgmtObject->inc_discardsPurge(c.matches.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsPurge(c.matches.size()); + if (!c.matches.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(c.matches.size()); + if (dest.get()) { + mgmtObject->inc_reroutes(c.matches.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(c.matches.size()); + brokerMgmtObject->inc_reroutes(c.matches.size()); + } + } else { + mgmtObject->inc_discardsPurge(c.matches.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(c.matches.size()); + brokerMgmtObject->inc_discardsPurge(c.matches.size()); + } + } } - } - for (std::deque::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { - // Update observers and message state: - observeAcquire(*qmsg, locker); - dequeue(0, *qmsg); - QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); - // now reroute if necessary - if (dest.get()) { - assert(qmsg->payload); - DeliverableMessage dmsg(qmsg->payload); - dest->routeWithAlternate(dmsg); + for (std::deque::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*qmsg, locker); + } + dequeue(0, *qmsg); + QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); + // now reroute if necessary + if (dest.get()) { + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); + dest->routeWithAlternate(dmsg); + } } } return c.matches.size(); @@ -698,27 +811,51 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, std::auto_ptr mf(MessageFilter::create(filter)); Collector c(*mf.get(), qty); - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind(boost::ref(c), _1) ); + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind(boost::ref(c), _1) ); + } + - for (std::deque::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { + if (!c.matches.empty()) { // Update observers and message state: - observeAcquire(*qmsg, locker); - dequeue(0, *qmsg); - // and move to destination Queue. - assert(qmsg->payload); - destq->deliver(qmsg->payload); + + if (mgmtObject) { + mgmtObject->inc_acquires(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(c.matches.size()); + } + + for (std::deque::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + { + Mutex::ScopedLock locker(messageLock); + observeAcquire(*qmsg, locker); + } + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); + } } return c.matches.size(); } /** Acquire the message at the given position, return true and msg if acquire succeeds */ -bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, - const Mutex::ScopedLock& locker) +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg) { - if (messages->acquire(position, msg)) { - observeAcquire(msg, locker); + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = messages->acquire(position, msg); + if (ok) observeAcquire(msg, locker); + } + if (ok) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } ++dequeueSincePurge; return true; } @@ -728,35 +865,43 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage void Queue::push(boost::intrusive_ptr& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; - QueuedMessage removed; + QueuedMessage removed, qm(this, msg); bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); - QueuedMessage qm(this, msg, ++sequence); - if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); - - dequeueRequired = messages->push(qm, removed); - if (dequeueRequired) { + qm.position = ++sequence; + if (messages->push(qm, removed)) { + dequeueRequired = true; observeAcquire(removed, locker); - if (mgmtObject) { - mgmtObject->inc_discardsLvq(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsLvq(); - } } - listeners.populate(copy); observeEnqueue(qm, locker); + if (policy.get()) { + policy->enqueued(qm); + } + listeners.populate(copy); } - copy.notify(); + if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position); + + mgntEnqStats(msg, mgmtObject, brokerMgmtObject); + if (dequeueRequired) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + mgmtObject->inc_discardsLvq(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + brokerMgmtObject->inc_discardsLvq(); + } if (isRecovery) { //can't issue new requests for the store until //recovery is complete + Mutex::ScopedLock locker(messageLock); pendingDequeues.push_back(removed); } else { dequeue(0, removed); } } + copy.notify(); } void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) @@ -767,8 +912,8 @@ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) /** function only provided for unit tests, or code not in critical message path */ uint32_t Queue::getEnqueueCompleteMessageCount() const { - Mutex::ScopedLock locker(messageLock); uint32_t count = 0; + Mutex::ScopedLock locker(messageLock); messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); return count; } @@ -781,13 +926,13 @@ uint32_t Queue::getMessageCount() const uint32_t Queue::getConsumerCount() const { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); return consumerCount; } bool Queue::canAutoDelete() const { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); return autodelete && !consumerCount && !owner; } @@ -894,14 +1039,20 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; - { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; if (!ctxt) { + if (policy.get()) policy->dequeued(msg); + messages->deleted(msg); observeDequeue(msg, locker); } } + + if (!ctxt) { + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); + } + // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -918,14 +1069,24 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - observeDequeue(msg, locker); + { + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->dequeued(msg); + messages->deleted(msg); + observeDequeue(msg, locker); + } + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); if (mgmtObject != 0) { - mgmtObject->inc_msgTxnDequeues(); - mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + const uint64_t contentSize = msg.payload->contentSize(); + qStats->msgTxnDequeues += 1; + qStats->byteTxnDequeues += contentSize; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgTxnDequeues(); - brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgTxnDequeues += 1; + bStats->byteTxnDequeues += contentSize; + brokerMgmtObject->statisticsUpdated(); } } } @@ -934,10 +1095,20 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) * Removes the first (oldest) message from the in-memory delivery queue as well dequeing * it from the logical (and persistent if applicable) queue */ -bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) +bool Queue::popAndDequeue(QueuedMessage& msg) { - if (messages->consume(msg)) { - observeAcquire(msg, locker); + bool popped; + { + Mutex::ScopedLock locker(messageLock); + popped = messages->consume(msg); + if (popped) observeAcquire(msg, locker); + } + if (popped) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } dequeue(0, msg); return true; } else { @@ -947,13 +1118,10 @@ bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) /** * Updates policy and management when a message has been dequeued, - * expects messageLock to be held + * Requires messageLock be held by caller. */ -void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - mgntDeqStats(msg.payload); - if (policy.get()) policy->dequeued(msg); - messages->deleted(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -963,17 +1131,11 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) } } -/** updates queue observers when a message has become unavailable for transfer, - * expects messageLock to be held +/** updates queue observers when a message has become unavailable for transfer. + * Requires messageLock be held by caller. */ -void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - if (mgmtObject) { - mgmtObject->inc_acquires(); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(); - } - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->acquired(msg); @@ -983,17 +1145,11 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) } } -/** updates queue observers when a message has become re-available for transfer, - * expects messageLock to be held +/** updates queue observers when a message has become re-available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - if (mgmtObject) { - mgmtObject->inc_releases(); - if (brokerMgmtObject) - brokerMgmtObject->inc_releases(); - } - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->requeued(msg); @@ -1003,6 +1159,33 @@ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) } } +/** updates queue observers when a new consumer has subscribed to this queue. + */ +void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerAdded(c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); + } + } +} + +/** updates queue observers when a consumer has unsubscribed from this queue. + */ +void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerRemoved(c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); + } + } +} + + void Queue::create(const FieldTable& _settings) { settings = _settings; @@ -1150,23 +1333,21 @@ void Queue::configureImpl(const FieldTable& _settings) void Queue::destroyed() { unbind(broker->getExchanges()); - { - Mutex::ScopedLock locker(messageLock); - QueuedMessage m; - while(popAndDequeue(m, locker)) { - DeliverableMessage msg(m.payload); - if (alternateExchange.get()) { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandonedViaAlt(); - alternateExchange->routeWithAlternate(msg); - } else { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandoned(); - } + + QueuedMessage m; + while(popAndDequeue(m)) { + DeliverableMessage msg(m.payload); + if (alternateExchange.get()) { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + alternateExchange->routeWithAlternate(msg); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); } - if (alternateExchange.get()) - alternateExchange->decAlternateUsers(); } + if (alternateExchange.get()) + alternateExchange->decAlternateUsers(); if (store) { barrier.destroy(); @@ -1177,7 +1358,7 @@ void Queue::destroyed() if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr(); notifyDeleted(); { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.clear(); } } @@ -1187,8 +1368,8 @@ void Queue::notifyDeleted() QueueListeners::ListenerSet set; { Mutex::ScopedLock locker(messageLock); - listeners.snapshot(set); deleted = true; + listeners.snapshot(set); } set.notifyAll(); } @@ -1206,6 +1387,7 @@ void Queue::unbind(ExchangeRegistry& exchanges) void Queue::setPolicy(std::auto_ptr _policy) { + Mutex::ScopedLock locker(messageLock); policy = _policy; if (policy.get()) policy->setQueue(this); @@ -1213,6 +1395,7 @@ void Queue::setPolicy(std::auto_ptr _policy) const QueuePolicy* Queue::getPolicy() { + Mutex::ScopedLock locker(messageLock); return policy.get(); } @@ -1302,7 +1485,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Queue::shared_ptr queue; AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {} void fire() { @@ -1388,11 +1571,15 @@ void Queue::countRejected() const void Queue::countFlowedToDisk(uint64_t size) const { if (mgmtObject) { - mgmtObject->inc_msgFtdEnqueues(); - mgmtObject->inc_byteFtdEnqueues(size); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + qStats->msgFtdEnqueues += 1; + qStats->byteFtdEnqueues += size; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgFtdEnqueues(); - brokerMgmtObject->inc_byteFtdEnqueues(size); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgFtdEnqueues += 1; + bStats->byteFtdEnqueues += size; + brokerMgmtObject->statisticsUpdated(); } } } @@ -1400,11 +1587,15 @@ void Queue::countFlowedToDisk(uint64_t size) const void Queue::countLoadedFromDisk(uint64_t size) const { if (mgmtObject) { - mgmtObject->inc_msgFtdDequeues(); - mgmtObject->inc_byteFtdDequeues(size); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + qStats->msgFtdDequeues += 1; + qStats->byteFtdDequeues += size; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgFtdDequeues(); - brokerMgmtObject->inc_byteFtdDequeues(size); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgFtdDequeues += 1; + bStats->byteFtdDequeues += size; + brokerMgmtObject->statisticsUpdated(); } } } @@ -1434,9 +1625,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str { _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args; boost::shared_ptr dest; - if (rerouteArgs.i_useAltExchange) + if (rerouteArgs.i_useAltExchange) { + if (!alternateExchange) { + status = Manageable::STATUS_PARAMETER_INVALID; + etext = "No alternate-exchange defined"; + break; + } dest = alternateExchange; - else { + } else { try { dest = broker->getExchanges().get(rerouteArgs.i_exchange); } catch(const std::exception&) { @@ -1486,8 +1682,12 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) << "\": exchange does not exist."); } //process any pending dequeues - for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); - pendingDequeues.clear(); + std::deque pd; + { + Mutex::ScopedLock locker(messageLock); + pendingDequeues.swap(pd); + } + for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } void Queue::insertSequenceNumbers(const std::string& key) @@ -1497,10 +1697,10 @@ void Queue::insertSequenceNumbers(const std::string& key) QPID_LOG(debug, "Inserting sequence numbers as " << key); } -/** updates queue observers and state when a message has become available for transfer, - * expects messageLock to be held +/** updates queue observers and state when a message has become available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) +void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1509,10 +1709,6 @@ void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } } - if (policy.get()) { - policy->enqueued(m); - } - mgntEnqStats(m.payload); } void Queue::updateEnqueued(const QueuedMessage& m) @@ -1520,12 +1716,16 @@ void Queue::updateEnqueued(const QueuedMessage& m) if (m.payload) { boost::intrusive_ptr payload = m.payload; enqueue(0, payload, true); - messages->updateAcquired(m); - if (policy.get()) { - policy->recoverEnqueued(payload); + { + Mutex::ScopedLock locker(messageLock); + messages->updateAcquired(m); + observeEnqueue(m, locker); + if (policy.get()) { + policy->recoverEnqueued(payload); + policy->enqueued(m); + } } - Mutex::ScopedLock locker(messageLock); - observeEnqueue(m, locker); + mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1533,10 +1733,16 @@ void Queue::updateEnqueued(const QueuedMessage& m) bool Queue::isEnqueued(const QueuedMessage& msg) { + Mutex::ScopedLock locker(messageLock); return !policy.get() || policy->isEnqueued(msg); } +// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's +// state is not changed while listeners is referenced. QueueListeners& Queue::getListeners() { return listeners; } + +// Note: accessing messages outside of lock is dangerous. Caller must ensure the queue's +// state is not changed while messages is referenced. Messages& Queue::getMessages() { return *messages; } const Messages& Queue::getMessages() const { return *messages; } @@ -1549,13 +1755,13 @@ void Queue::checkNotDeleted(const Consumer::shared_ptr& c) void Queue::addObserver(boost::shared_ptr observer) { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.insert(observer); } void Queue::removeObserver(boost::shared_ptr observer) { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.erase(observer); } @@ -1618,7 +1824,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() { - Monitor::ScopedLock l(parent.messageLock); + Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */ if (parent.deleted) { return false; } else { @@ -1639,3 +1845,6 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } + +}} + diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e8573c17cc..9869a698c1 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -97,7 +97,8 @@ class Queue : public boost::enable_shared_from_this, const bool autodelete; MessageStore* store; const OwnershipToken* owner; - uint32_t consumerCount; + uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not. + uint32_t browserCount; // Count of non-acquiring subscriptions. OwnershipToken* exclusive; bool noLocal; bool persistLastNode; @@ -107,7 +108,22 @@ class Queue : public boost::enable_shared_from_this, QueueListeners listeners; std::auto_ptr messages; std::deque pendingDequeues;//used to avoid dequeuing during recovery - mutable qpid::sys::Mutex consumerLock; + /** messageLock is used to keep the Queue's state consistent while processing message + * events, such as message dispatch, enqueue, acquire, and dequeue. It must be held + * while updating certain members in order to keep these members consistent with + * each other: + * o messages + * o sequence + * o policy + * o listeners + * o allocator + * o observeXXX() methods + * o observers + * o pendingDequeues (TBD: move under separate lock) + * o exclusive OwnershipToken (TBD: move under separate lock) + * o consumerCount (TBD: move under separate lock) + * o Queue::UsageBarrier (TBD: move under separate lock) + */ mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; mutable uint64_t persistenceId; @@ -143,52 +159,20 @@ class Queue : public boost::enable_shared_from_this, bool isExcluded(boost::intrusive_ptr& msg); - /** update queue observers, stats, policy, etc when the messages' state changes. Lock - * must be held by caller */ + /** update queue observers, stats, policy, etc when the messages' state changes. + * messageLock is held by caller */ void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock); - // acquire message @ position, return true and set msg if acquire succeeds - bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, - const sys::Mutex::ScopedLock& held); + void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock); + void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock); + bool popAndDequeue(QueuedMessage&); + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg); void forcePersistent(QueuedMessage& msg); int getEventMode(); void configureImpl(const qpid::framing::FieldTable& settings); - - inline void mgntEnqStats(const boost::intrusive_ptr& msg) - { - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - brokerMgmtObject->inc_msgTotalEnqueues (); - brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - if (msg->isPersistent ()) { - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - brokerMgmtObject->inc_msgPersistEnqueues (); - brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } - } - } - inline void mgntDeqStats(const boost::intrusive_ptr& msg) - { - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg->contentSize()); - brokerMgmtObject->inc_msgTotalDequeues (); - brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize()); - if (msg->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg->contentSize()); - brokerMgmtObject->inc_msgPersistDequeues (); - brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize()); - } - } - } - void checkNotDeleted(const Consumer::shared_ptr& c); void notifyDeleted(); @@ -235,8 +219,9 @@ class Queue : public boost::enable_shared_from_this, /** * Bind self to specified exchange, and record that binding for unbinding on delete. */ - bool bind(boost::shared_ptr exchange, const std::string& key, - const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); + QPID_BROKER_EXTERN bool bind( + boost::shared_ptr exchange, const std::string& key, + const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); /** Acquire the message at the given position if it is available for acquire. Not to * be used by clients, but used by the broker for queue management. @@ -271,28 +256,29 @@ class Queue : public boost::enable_shared_from_this, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, //defaults to all messages + QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0, //defaults to all messages boost::shared_ptr dest=boost::shared_ptr(), const ::qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty, - const qpid::types::Variant::Map *filter=0); + QPID_BROKER_EXTERN uint32_t move( + const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; QPID_BROKER_EXTERN uint32_t getConsumerCount() const; inline const std::string& getName() const { return name; } - bool isExclusiveOwner(const OwnershipToken* const o) const; - void releaseExclusiveOwnership(); - bool setExclusiveOwner(const OwnershipToken* const o); - bool hasExclusiveConsumer() const; - bool hasExclusiveOwner() const; + QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const; + QPID_BROKER_EXTERN void releaseExclusiveOwnership(); + QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o); + QPID_BROKER_EXTERN bool hasExclusiveConsumer() const; + QPID_BROKER_EXTERN bool hasExclusiveOwner() const; inline bool isDurable() const { return store != 0; } inline const framing::FieldTable& getSettings() const { return settings; } inline bool isAutoDelete() const { return autodelete; } - bool canAutoDelete() const; + QPID_BROKER_EXTERN bool canAutoDelete() const; const QueueBindings& getBindings() const { return bindings; } /** @@ -301,8 +287,8 @@ class Queue : public boost::enable_shared_from_this, QPID_BROKER_EXTERN void setLastNodeFailure(); QPID_BROKER_EXTERN void clearLastNodeFailure(); - bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr& msg, bool suppressPolicyCheck = false); - void enqueueAborted(boost::intrusive_ptr msg); + QPID_BROKER_EXTERN bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr& msg, bool suppressPolicyCheck = false); + QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr msg); /** * dequeue from store (only done once messages is acknowledged) */ @@ -311,7 +297,7 @@ class Queue : public boost::enable_shared_from_this, * Inform the queue that a previous transactional dequeue * committed. */ - void dequeueCommitted(const QueuedMessage& msg); + QPID_BROKER_EXTERN void dequeueCommitted(const QueuedMessage& msg); /** * Inform queue of messages that were enqueued, have since @@ -319,7 +305,7 @@ class Queue : public boost::enable_shared_from_this, * thus are still logically on the queue) - used in * clustered broker. */ - void updateEnqueued(const QueuedMessage& msg); + QPID_BROKER_EXTERN void updateEnqueued(const QueuedMessage& msg); /** * Test whether the specified message (identified by its @@ -328,7 +314,7 @@ class Queue : public boost::enable_shared_from_this, * have been delievered to a subscriber who has not yet * accepted it). */ - bool isEnqueued(const QueuedMessage& msg); + QPID_BROKER_EXTERN bool isEnqueued(const QueuedMessage& msg); /** * Acquires the next available (oldest) message @@ -338,17 +324,17 @@ class Queue : public boost::enable_shared_from_this, /** Get the message at position pos, returns true if found and sets msg */ QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const; - const QueuePolicy* getPolicy(); + QPID_BROKER_EXTERN const QueuePolicy* getPolicy(); - void setAlternateExchange(boost::shared_ptr exchange); - boost::shared_ptr getAlternateExchange(); - bool isLocal(boost::intrusive_ptr& msg); + QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr exchange); + QPID_BROKER_EXTERN boost::shared_ptr getAlternateExchange(); + QPID_BROKER_EXTERN bool isLocal(boost::intrusive_ptr& msg); //PersistableQueue support: - uint64_t getPersistenceId() const; - void setPersistenceId(uint64_t persistenceId) const; - void encode(framing::Buffer& buffer) const; - uint32_t encodedSize() const; + QPID_BROKER_EXTERN uint64_t getPersistenceId() const; + QPID_BROKER_EXTERN void setPersistenceId(uint64_t persistenceId) const; + QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const; + QPID_BROKER_EXTERN uint32_t encodedSize() const; /** * Restores a queue from encoded data (used in recovery) @@ -362,15 +348,15 @@ class Queue : public boost::enable_shared_from_this, virtual void setExternalQueueStore(ExternalQueueStore* inst); // Increment the rejected-by-consumer counter. - void countRejected() const; - void countFlowedToDisk(uint64_t size) const; - void countLoadedFromDisk(uint64_t size) const; + QPID_BROKER_EXTERN void countRejected() const; + QPID_BROKER_EXTERN void countFlowedToDisk(uint64_t size) const; + QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const; // Manageable entry points - management::ManagementObject* GetManagementObject (void) const; + QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - void query(::qpid::types::Variant::Map&) const; + QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const; /** Apply f to each Message on the queue. */ template void eachMessage(F f) { @@ -385,6 +371,7 @@ class Queue : public boost::enable_shared_from_this, /** Apply f to each Observer on the queue */ template void eachObserver(F f) { + sys::Mutex::ScopedLock l(messageLock); std::for_each(observers.begin(), observers.end(), f); } @@ -396,31 +383,31 @@ class Queue : public boost::enable_shared_from_this, /** return current position sequence number for the next message on the queue. */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); - void addObserver(boost::shared_ptr); - void removeObserver(boost::shared_ptr); + QPID_BROKER_EXTERN void addObserver(boost::shared_ptr); + QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); /** * Notify queue that recovery has completed. */ - void recoveryComplete(ExchangeRegistry& exchanges); + QPID_BROKER_EXTERN void recoveryComplete(ExchangeRegistry& exchanges); // For cluster update - QueueListeners& getListeners(); - Messages& getMessages(); - const Messages& getMessages() const; + QPID_BROKER_EXTERN QueueListeners& getListeners(); + QPID_BROKER_EXTERN Messages& getMessages(); + QPID_BROKER_EXTERN const Messages& getMessages() const; /** * Reserve space in policy for an enqueued message that * has been recovered in the prepared state (dtx only) */ - void recoverPrepared(boost::intrusive_ptr& msg); + QPID_BROKER_EXTERN void recoverPrepared(boost::intrusive_ptr& msg); - void flush(); + QPID_BROKER_EXTERN void flush(); - Broker* getBroker(); + QPID_BROKER_EXTERN Broker* getBroker(); uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } - void setDequeueSincePurge(uint32_t value); + QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value); }; } } diff --git a/cpp/src/qpid/broker/QueueListeners.cpp b/cpp/src/qpid/broker/QueueListeners.cpp index 32c208b073..0338a674cf 100644 --- a/cpp/src/qpid/broker/QueueListeners.cpp +++ b/cpp/src/qpid/broker/QueueListeners.cpp @@ -79,10 +79,6 @@ void QueueListeners::NotificationSet::notify() std::for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify)); } -bool QueueListeners::contains(Consumer::shared_ptr c) const { - return c->inListeners; -} - void QueueListeners::ListenerSet::notifyAll() { std::for_each(listeners.begin(), listeners.end(), boost::mem_fn(&Consumer::notify)); diff --git a/cpp/src/qpid/broker/QueueListeners.h b/cpp/src/qpid/broker/QueueListeners.h index 0659499253..ca844fd47e 100644 --- a/cpp/src/qpid/broker/QueueListeners.h +++ b/cpp/src/qpid/broker/QueueListeners.h @@ -30,7 +30,7 @@ namespace broker { /** * Track and notify components that wish to be notified of messages * that become available on a queue. - * + * * None of the methods defined here are protected by locking. However * the populate method allows a 'snapshot' to be taken of the * listeners to be notified. NotificationSet::notify() may then be @@ -61,11 +61,10 @@ class QueueListeners friend class QueueListeners; }; - void addListener(Consumer::shared_ptr); - void removeListener(Consumer::shared_ptr); + void addListener(Consumer::shared_ptr); + void removeListener(Consumer::shared_ptr); void populate(NotificationSet&); void snapshot(ListenerSet&); - bool contains(Consumer::shared_ptr c) const; void notifyAll(); template void eachListener(F f) { diff --git a/cpp/src/qpid/broker/QueuedMessage.cpp b/cpp/src/qpid/broker/QueuedMessage.cpp new file mode 100644 index 0000000000..d40cc901ff --- /dev/null +++ b/cpp/src/qpid/broker/QueuedMessage.cpp @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueuedMessage.h" +#include "Queue.h" +#include + +namespace qpid { +namespace broker { + +std::ostream& operator<<(std::ostream& o, const QueuedMessage& qm) { + o << (qm.queue ? qm.queue->getName() : std::string()) << "[" << qm.position <<"]"; + return o; +} + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h index 806da8e720..9d008193a0 100644 --- a/cpp/src/qpid/broker/QueuedMessage.h +++ b/cpp/src/qpid/broker/QueuedMessage.h @@ -22,6 +22,8 @@ #define _QueuedMessage_ #include "qpid/broker/Message.h" +#include "BrokerImportExport.h" +#include namespace qpid { namespace broker { @@ -47,6 +49,7 @@ inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } +QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueuedMessage&); }} diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index d7adbd68ab..80fa5e1c0e 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/FieldValue.h" #include "qpid/sys/SecuritySettings.h" #include diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index e7d2259c80..64924bdd4c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -489,14 +489,14 @@ void SemanticState::route(intrusive_ptr msg, Deliverable& strategy) { exchangeName << " with routing-key " << msg->getRoutingKey())); } - cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + cacheExchange->route(strategy); if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + cacheExchange->getAlternate()->route(strategy); } if (!strategy.delivered) { msg->destroy(); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 5a83fd0fb3..e5e1d2da16 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -22,6 +22,7 @@ * */ +#include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Credit.h" #include "qpid/broker/Deliverable.h" @@ -39,7 +40,6 @@ #include "qpid/sys/AggregateOutput.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicValue.h" -#include "qpid/broker/AclModule.h" #include "qmf/org/apache/qpid/broker/Subscription.h" #include @@ -99,42 +99,44 @@ class SemanticState : private boost::noncopyable { bool haveCredit(); protected: - virtual bool doDispatch(); + QPID_BROKER_EXTERN virtual bool doDispatch(); size_t unacked() { return parent->unacked.size(); } public: typedef boost::shared_ptr shared_ptr; - ConsumerImpl(SemanticState* parent, - const std::string& name, boost::shared_ptr queue, - bool ack, bool acquire, bool exclusive, - const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - virtual ~ConsumerImpl(); - OwnershipToken* getSession(); - virtual bool deliver(QueuedMessage& msg); - bool filter(boost::intrusive_ptr msg); - bool accept(boost::intrusive_ptr msg); - void cancel() {} - - void disableNotify(); - void enableNotify(); - void notify(); - bool isNotifyEnabled() const; - - void requestDispatch(); - - void setWindowMode(); - void setCreditMode(); - void addByteCredit(uint32_t value); - void addMessageCredit(uint32_t value); - void flush(); - void stop(); - void complete(DeliveryRecord&); + QPID_BROKER_EXTERN ConsumerImpl( + SemanticState* parent, + const std::string& name, boost::shared_ptr queue, + bool ack, bool acquire, bool exclusive, + const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + QPID_BROKER_EXTERN virtual ~ConsumerImpl(); + QPID_BROKER_EXTERN OwnershipToken* getSession(); + QPID_BROKER_EXTERN virtual bool deliver(QueuedMessage& msg); + QPID_BROKER_EXTERN bool filter(boost::intrusive_ptr msg); + QPID_BROKER_EXTERN bool accept(boost::intrusive_ptr msg); + QPID_BROKER_EXTERN void cancel() {} + + QPID_BROKER_EXTERN void disableNotify(); + QPID_BROKER_EXTERN void enableNotify(); + QPID_BROKER_EXTERN void notify(); + QPID_BROKER_EXTERN bool isNotifyEnabled() const; + + QPID_BROKER_EXTERN void requestDispatch(); + + QPID_BROKER_EXTERN void setWindowMode(); + QPID_BROKER_EXTERN void setCreditMode(); + QPID_BROKER_EXTERN void addByteCredit(uint32_t value); + QPID_BROKER_EXTERN void addMessageCredit(uint32_t value); + QPID_BROKER_EXTERN void flush(); + QPID_BROKER_EXTERN void stop(); + QPID_BROKER_EXTERN void complete(DeliveryRecord&); boost::shared_ptr getQueue() const { return queue; } bool isBlocked() const { return blocked; } bool setBlocked(bool set) { std::swap(set, blocked); return set; } - bool doOutput(); + QPID_BROKER_EXTERN bool doOutput(); Credit& getCredit() { return credit; } const Credit& getCredit() const { return credit; } @@ -152,8 +154,11 @@ class SemanticState : private boost::noncopyable { void acknowledged(const broker::QueuedMessage&) {} // manageable entry points - management::ManagementObject* GetManagementObject (void) const; - management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + QPID_BROKER_EXTERN management::ManagementObject* + GetManagementObject(void) const; + + QPID_BROKER_EXTERN management::Manageable::status_t + ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); }; typedef std::map DtxBufferMap; diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 4aad46f782..78f2e43ce0 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -21,8 +21,9 @@ #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/enum.h" -#include "qpid/log/Statement.h" +#include "qpid/framing/FieldValue.h" #include "qpid/framing/SequenceSet.h" +#include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" #include "qpid/broker/SessionState.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" @@ -73,18 +74,12 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const if(passive){ AclModule* acl = getBroker().getAcl(); if (acl) { - //TODO: why does a passive declare require create - //permission? The purpose of the passive flag is to state - //that the exchange should *not* created. For - //authorisation a passive declare is similar to - //exchange-query. std::map params; params.insert(make_pair(acl::PROP_TYPE, type)); params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId())); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchange,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange access request from " << getConnection().getUserId())); } Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); checkType(actual, type); @@ -274,22 +269,16 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& if (passive && !name.empty()) { AclModule* acl = getBroker().getAcl(); if (acl) { - //TODO: why does a passive declare require create - //permission? The purpose of the passive flag is to state - //that the queue should *not* created. For - //authorisation a passive declare is similar to - //queue-query (or indeed a qmf query). std::map params; params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast(arguments.getAsInt("qpid.max_count")))); params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast(arguments.getAsInt64("qpid.max_size")))); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,¶ms) ) + throw UnauthorizedAccessException(QPID_MSG("ACL denied queue access request from " << getConnection().getUserId())); } queue = getQueue(name); //TODO: check alternate-exchange is as expected @@ -409,6 +398,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); + // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA. if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0) throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " << queue->getName())); @@ -548,13 +538,6 @@ void SessionAdapter::TxHandlerImpl::rollback() state.rollback(); } -std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid) -{ - std::string encoded; - encode(xid, encoded); - return encoded; -} - void SessionAdapter::DtxHandlerImpl::select() { state.selectDtx(); @@ -566,7 +549,7 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, { try { if (fail) { - state.endDtx(convert(xid), true); + state.endDtx(DtxManager::convert(xid), true); if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { @@ -574,9 +557,9 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, } } else { if (suspend) { - state.suspendDtx(convert(xid)); + state.suspendDtx(DtxManager::convert(xid)); } else { - state.endDtx(convert(xid), false); + state.endDtx(DtxManager::convert(xid), false); } return XaResult(XA_STATUS_XA_OK); } @@ -594,9 +577,9 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, } try { if (resume) { - state.resumeDtx(convert(xid)); + state.resumeDtx(DtxManager::convert(xid)); } else { - state.startDtx(convert(xid), getBroker().getDtxManager(), join); + state.startDtx(DtxManager::convert(xid), getBroker().getDtxManager(), join); } return XaResult(XA_STATUS_XA_OK); } catch (const DtxTimeoutException& /*e*/) { @@ -607,7 +590,7 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) { try { - bool ok = getBroker().getDtxManager().prepare(convert(xid)); + bool ok = getBroker().getDtxManager().prepare(DtxManager::convert(xid)); return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); } catch (const DtxTimeoutException& /*e*/) { return XaResult(XA_STATUS_XA_RBTIMEOUT); @@ -618,7 +601,7 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, bool onePhase) { try { - bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); + bool ok = getBroker().getDtxManager().commit(DtxManager::convert(xid), onePhase); return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); } catch (const DtxTimeoutException& /*e*/) { return XaResult(XA_STATUS_XA_RBTIMEOUT); @@ -629,7 +612,7 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) { try { - getBroker().getDtxManager().rollback(convert(xid)); + getBroker().getDtxManager().rollback(DtxManager::convert(xid)); return XaResult(XA_STATUS_XA_OK); } catch (const DtxTimeoutException& /*e*/) { return XaResult(XA_STATUS_XA_RBTIMEOUT); @@ -659,7 +642,7 @@ void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) { - uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); + uint32_t timeout = getBroker().getDtxManager().getTimeout(DtxManager::convert(xid)); return DtxGetTimeoutResult(timeout); } @@ -667,7 +650,7 @@ DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, uint32_t timeout) { - getBroker().getDtxManager().setTimeout(convert(xid), timeout); + getBroker().getDtxManager().setTimeout(DtxManager::convert(xid), timeout); } diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index 8987c4812f..bc056538b1 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -226,10 +226,8 @@ class Queue; void rollback(); }; - class DtxHandlerImpl : public DtxHandler, public HandlerHelper, private framing::StructHelper + class DtxHandlerImpl : public DtxHandler, public HandlerHelper { - std::string convert(const framing::Xid& xid); - public: DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 752fa55535..b58c7c01c5 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -64,6 +64,7 @@ void SessionHandler::handleDetach() { if (session.get()) connection.getBroker().getSessionManager().detach(session); assert(!session.get()); + if (detachedCallback) detachedCallback(); connection.closeChannel(channel.get()); } @@ -117,4 +118,8 @@ void SessionHandler::attached(const std::string& name) } } +void SessionHandler::setDetachedCallback(boost::function cb) { + detachedCallback = cb; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 8cd5072574..4e2cfaa963 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,6 +25,7 @@ #include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include namespace qpid { class SessionState; @@ -61,7 +62,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { * This proxy is for sending such commands. In a clustered broker it will take steps * to synchronize command order across the cluster. In a stand-alone broker * it is just a synonym for getProxy() - */ + */ framing::AMQP_ClientProxy& getClusterOrderProxy() { return clusterOrderProxy.get() ? *clusterOrderProxy : proxy; } @@ -70,6 +71,8 @@ class SessionHandler : public amqp_0_10::SessionHandler { void attached(const std::string& name);//used by 'pushing' inter-broker bridges void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges + void setDetachedCallback(boost::function cb); + protected: virtual void setState(const std::string& sessionName, bool force); virtual qpid::SessionState* getState(); @@ -91,6 +94,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { framing::AMQP_ClientProxy proxy; std::auto_ptr session; std::auto_ptr clusterOrderProxy; + boost::function detachedCallback; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 644a3d628e..dd3ec13019 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -350,8 +350,9 @@ TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queu return (q != qv.end()) ? bk : 0; } -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +void TopicExchange::route(Deliverable& msg) { + const string& routingKey = msg.getMessage().getRoutingKey(); // Note: PERFORMANCE CRITICAL!!! BindingList b; std::map::iterator it; diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 636918f8a1..cc24e1411e 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -185,9 +185,7 @@ class TopicExchange : public virtual Exchange { virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, diff --git a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp index 2acc09cded..a38e6ac12a 100644 --- a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp @@ -25,6 +25,7 @@ #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/FieldValue.h" #include -- cgit v1.2.1