summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/broker
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/AclModule.h191
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp15
-rw-r--r--cpp/src/qpid/broker/Bridge.h13
-rw-r--r--cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--cpp/src/qpid/broker/Broker.h87
-rw-r--r--cpp/src/qpid/broker/Connection.cpp16
-rw-r--r--cpp/src/qpid/broker/Connection.h5
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp1
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h1
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp3
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h4
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp29
-rw-r--r--cpp/src/qpid/broker/DtxManager.h3
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp11
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp28
-rw-r--r--cpp/src/qpid/broker/Exchange.h2
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp3
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h6
-rw-r--r--cpp/src/qpid/broker/Fairshare.cpp1
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h4
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp3
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h4
-rw-r--r--cpp/src/qpid/broker/LegacyLVQ.cpp29
-rw-r--r--cpp/src/qpid/broker/LegacyLVQ.h1
-rw-r--r--cpp/src/qpid/broker/Link.cpp263
-rw-r--r--cpp/src/qpid/broker/Link.h58
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp31
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h110
-rw-r--r--cpp/src/qpid/broker/Message.cpp8
-rw-r--r--cpp/src/qpid/broker/MessageDeque.cpp20
-rw-r--r--cpp/src/qpid/broker/MessageDeque.h7
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp92
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.h19
-rw-r--r--cpp/src/qpid/broker/MessageMap.cpp78
-rw-r--r--cpp/src/qpid/broker/MessageMap.h2
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.cpp118
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.h27
-rw-r--r--cpp/src/qpid/broker/Queue.cpp667
-rw-r--r--cpp/src/qpid/broker/Queue.h147
-rw-r--r--cpp/src/qpid/broker/QueueListeners.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueListeners.h7
-rw-r--r--cpp/src/qpid/broker/QueuedMessage.cpp34
-rw-r--r--cpp/src/qpid/broker/QueuedMessage.h3
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp1
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp4
-rw-r--r--cpp/src/qpid/broker/SemanticState.h65
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp51
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h4
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp5
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h10
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp3
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h4
-rw-r--r--cpp/src/qpid/broker/windows/SaslAuthenticator.cpp1
54 files changed, 1528 insertions, 784 deletions
diff --git a/cpp/src/qpid/broker/AclModule.h b/cpp/src/qpid/broker/AclModule.h
index e32ff266b9..ff9281b6fc 100644
--- a/cpp/src/qpid/broker/AclModule.h
+++ b/cpp/src/qpid/broker/AclModule.h
@@ -22,6 +22,7 @@
#include "qpid/RefCounted.h"
+#include "qpid/Exception.h"
#include <boost/shared_ptr.hpp>
#include <map>
#include <set>
@@ -32,17 +33,81 @@ namespace qpid {
namespace acl {
- enum ObjectType {OBJ_QUEUE, OBJ_EXCHANGE, OBJ_BROKER, OBJ_LINK,
- OBJ_METHOD, OBJECTSIZE}; // OBJECTSIZE must be last in list
- enum Action {ACT_CONSUME, ACT_PUBLISH, ACT_CREATE, ACT_ACCESS, ACT_BIND,
- ACT_UNBIND, ACT_DELETE, ACT_PURGE, ACT_UPDATE,
- ACTIONSIZE}; // ACTIONSIZE must be last in list
- enum Property {PROP_NAME, PROP_DURABLE, PROP_OWNER, PROP_ROUTINGKEY,
- PROP_PASSIVE, PROP_AUTODELETE, PROP_EXCLUSIVE, PROP_TYPE,
- PROP_ALTERNATE, PROP_QUEUENAME, PROP_SCHEMAPACKAGE,
- PROP_SCHEMACLASS, PROP_POLICYTYPE, PROP_MAXQUEUESIZE,
- PROP_MAXQUEUECOUNT};
- enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG};
+ // Interface enumerations.
+ // These enumerations define enum lists and implied text strings
+ // to match. They are used in two areas:
+ // 1. In the ACL specifications in the ACL file, file parsing, and
+ // internal rule storage.
+ // 2. In the authorize interface in the rest of the broker where
+ // code requests the ACL module to authorize an action.
+
+ // ObjectType shared between ACL spec and ACL authorise interface
+ enum ObjectType {
+ OBJ_QUEUE,
+ OBJ_EXCHANGE,
+ OBJ_BROKER,
+ OBJ_LINK,
+ OBJ_METHOD,
+ OBJECTSIZE }; // OBJECTSIZE must be last in list
+
+ // Action shared between ACL spec and ACL authorise interface
+ enum Action {
+ ACT_CONSUME,
+ ACT_PUBLISH,
+ ACT_CREATE,
+ ACT_ACCESS,
+ ACT_BIND,
+ ACT_UNBIND,
+ ACT_DELETE,
+ ACT_PURGE,
+ ACT_UPDATE,
+ ACTIONSIZE }; // ACTIONSIZE must be last in list
+
+ // Property used in ACL authorize interface
+ enum Property {
+ PROP_NAME,
+ PROP_DURABLE,
+ PROP_OWNER,
+ PROP_ROUTINGKEY,
+ PROP_AUTODELETE,
+ PROP_EXCLUSIVE,
+ PROP_TYPE,
+ PROP_ALTERNATE,
+ PROP_QUEUENAME,
+ PROP_SCHEMAPACKAGE,
+ PROP_SCHEMACLASS,
+ PROP_POLICYTYPE,
+ PROP_MAXQUEUESIZE,
+ PROP_MAXQUEUECOUNT };
+
+ // Property used in ACL spec file
+ // Note for properties common to file processing/rule storage and to
+ // broker rule lookups the identical enum values are used.
+ enum SpecProperty {
+ SPECPROP_NAME = PROP_NAME,
+ SPECPROP_DURABLE = PROP_DURABLE,
+ SPECPROP_OWNER = PROP_OWNER,
+ SPECPROP_ROUTINGKEY = PROP_ROUTINGKEY,
+ SPECPROP_AUTODELETE = PROP_AUTODELETE,
+ SPECPROP_EXCLUSIVE = PROP_EXCLUSIVE,
+ SPECPROP_TYPE = PROP_TYPE,
+ SPECPROP_ALTERNATE = PROP_ALTERNATE,
+ SPECPROP_QUEUENAME = PROP_QUEUENAME,
+ SPECPROP_SCHEMAPACKAGE = PROP_SCHEMAPACKAGE,
+ SPECPROP_SCHEMACLASS = PROP_SCHEMACLASS,
+ SPECPROP_POLICYTYPE = PROP_POLICYTYPE,
+
+ SPECPROP_MAXQUEUESIZELOWERLIMIT,
+ SPECPROP_MAXQUEUESIZEUPPERLIMIT,
+ SPECPROP_MAXQUEUECOUNTLOWERLIMIT,
+ SPECPROP_MAXQUEUECOUNTUPPERLIMIT };
+
+// AclResult shared between ACL spec and ACL authorise interface
+ enum AclResult {
+ ALLOW,
+ ALLOWLOG,
+ DENY,
+ DENYLOG };
} // namespace acl
@@ -54,14 +119,25 @@ namespace broker {
public:
- // effienty turn off ACL on message transfer.
+ // Some ACLs are invoked on every message transfer.
+ // doTransferAcl pervents time consuming ACL calls on a per-message basis.
virtual bool doTransferAcl()=0;
- virtual bool authorise(const std::string& id, const acl::Action& action, const acl::ObjectType& objType, const std::string& name,
+ virtual bool authorise(
+ const std::string& id,
+ const acl::Action& action,
+ const acl::ObjectType& objType,
+ const std::string& name,
std::map<acl::Property, std::string>* params=0)=0;
- virtual bool authorise(const std::string& id, const acl::Action& action, const acl::ObjectType& objType, const std::string& ExchangeName,
- const std::string& RoutingKey)=0;
- // create specilied authorise methods for cases that need faster matching as needed.
+
+ virtual bool authorise(
+ const std::string& id,
+ const acl::Action& action,
+ const acl::ObjectType& objType,
+ const std::string& ExchangeName,
+ const std::string& RoutingKey)=0;
+
+ // Add specialized authorise() methods as required.
virtual ~AclModule() {};
};
@@ -79,7 +155,7 @@ namespace acl {
if (str.compare("broker") == 0) return OBJ_BROKER;
if (str.compare("link") == 0) return OBJ_LINK;
if (str.compare("method") == 0) return OBJ_METHOD;
- throw str;
+ throw qpid::Exception(str);
}
static inline std::string getObjectTypeStr(const ObjectType o) {
switch (o) {
@@ -102,7 +178,7 @@ namespace acl {
if (str.compare("delete") == 0) return ACT_DELETE;
if (str.compare("purge") == 0) return ACT_PURGE;
if (str.compare("update") == 0) return ACT_UPDATE;
- throw str;
+ throw qpid::Exception(str);
}
static inline std::string getActionStr(const Action a) {
switch (a) {
@@ -124,7 +200,6 @@ namespace acl {
if (str.compare("durable") == 0) return PROP_DURABLE;
if (str.compare("owner") == 0) return PROP_OWNER;
if (str.compare("routingkey") == 0) return PROP_ROUTINGKEY;
- if (str.compare("passive") == 0) return PROP_PASSIVE;
if (str.compare("autodelete") == 0) return PROP_AUTODELETE;
if (str.compare("exclusive") == 0) return PROP_EXCLUSIVE;
if (str.compare("type") == 0) return PROP_TYPE;
@@ -134,8 +209,8 @@ namespace acl {
if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS;
if (str.compare("policytype") == 0) return PROP_POLICYTYPE;
if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE;
- if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT;
- throw str;
+ if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT;
+ throw qpid::Exception(str);
}
static inline std::string getPropertyStr(const Property p) {
switch (p) {
@@ -143,7 +218,6 @@ namespace acl {
case PROP_DURABLE: return "durable";
case PROP_OWNER: return "owner";
case PROP_ROUTINGKEY: return "routingkey";
- case PROP_PASSIVE: return "passive";
case PROP_AUTODELETE: return "autodelete";
case PROP_EXCLUSIVE: return "exclusive";
case PROP_TYPE: return "type";
@@ -153,17 +227,61 @@ namespace acl {
case PROP_SCHEMACLASS: return "schemaclass";
case PROP_POLICYTYPE: return "policytype";
case PROP_MAXQUEUESIZE: return "maxqueuesize";
- case PROP_MAXQUEUECOUNT: return "maxqueuecount";
+ case PROP_MAXQUEUECOUNT: return "maxqueuecount";
default: assert(false); // should never get here
}
return "";
}
+ static inline SpecProperty getSpecProperty(const std::string& str) {
+ if (str.compare("name") == 0) return SPECPROP_NAME;
+ if (str.compare("durable") == 0) return SPECPROP_DURABLE;
+ if (str.compare("owner") == 0) return SPECPROP_OWNER;
+ if (str.compare("routingkey") == 0) return SPECPROP_ROUTINGKEY;
+ if (str.compare("autodelete") == 0) return SPECPROP_AUTODELETE;
+ if (str.compare("exclusive") == 0) return SPECPROP_EXCLUSIVE;
+ if (str.compare("type") == 0) return SPECPROP_TYPE;
+ if (str.compare("alternate") == 0) return SPECPROP_ALTERNATE;
+ if (str.compare("queuename") == 0) return SPECPROP_QUEUENAME;
+ if (str.compare("schemapackage") == 0) return SPECPROP_SCHEMAPACKAGE;
+ if (str.compare("schemaclass") == 0) return SPECPROP_SCHEMACLASS;
+ if (str.compare("policytype") == 0) return SPECPROP_POLICYTYPE;
+ if (str.compare("queuemaxsizelowerlimit") == 0) return SPECPROP_MAXQUEUESIZELOWERLIMIT;
+ if (str.compare("queuemaxsizeupperlimit") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT;
+ if (str.compare("queuemaxcountlowerlimit") == 0) return SPECPROP_MAXQUEUECOUNTLOWERLIMIT;
+ if (str.compare("queuemaxcountupperlimit") == 0) return SPECPROP_MAXQUEUECOUNTUPPERLIMIT;
+ // Allow old names in ACL file as aliases for newly-named properties
+ if (str.compare("maxqueuesize") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT;
+ if (str.compare("maxqueuecount") == 0) return SPECPROP_MAXQUEUECOUNTUPPERLIMIT;
+ throw qpid::Exception(str);
+ }
+ static inline std::string getPropertyStr(const SpecProperty p) {
+ switch (p) {
+ case SPECPROP_NAME: return "name";
+ case SPECPROP_DURABLE: return "durable";
+ case SPECPROP_OWNER: return "owner";
+ case SPECPROP_ROUTINGKEY: return "routingkey";
+ case SPECPROP_AUTODELETE: return "autodelete";
+ case SPECPROP_EXCLUSIVE: return "exclusive";
+ case SPECPROP_TYPE: return "type";
+ case SPECPROP_ALTERNATE: return "alternate";
+ case SPECPROP_QUEUENAME: return "queuename";
+ case SPECPROP_SCHEMAPACKAGE: return "schemapackage";
+ case SPECPROP_SCHEMACLASS: return "schemaclass";
+ case SPECPROP_POLICYTYPE: return "policytype";
+ case SPECPROP_MAXQUEUESIZELOWERLIMIT: return "queuemaxsizelowerlimit";
+ case SPECPROP_MAXQUEUESIZEUPPERLIMIT: return "queuemaxsizeupperlimit";
+ case SPECPROP_MAXQUEUECOUNTLOWERLIMIT: return "queuemaxcountlowerlimit";
+ case SPECPROP_MAXQUEUECOUNTUPPERLIMIT: return "queuemaxcountupperlimit";
+ default: assert(false); // should never get here
+ }
+ return "";
+ }
static inline AclResult getAclResult(const std::string& str) {
if (str.compare("allow") == 0) return ALLOW;
if (str.compare("allow-log") == 0) return ALLOWLOG;
if (str.compare("deny") == 0) return DENY;
if (str.compare("deny-log") == 0) return DENYLOG;
- throw str;
+ throw qpid::Exception(str);
}
static inline std::string getAclResultStr(const AclResult r) {
switch (r) {
@@ -187,8 +305,11 @@ namespace acl {
typedef boost::shared_ptr<objectMap> objectMapPtr;
typedef std::map<Property, std::string> propMap;
typedef propMap::const_iterator propMapItr;
+ typedef std::map<SpecProperty, std::string> specPropMap;
+ typedef specPropMap::const_iterator specPropMapItr;
- // This map contains the legal combinations of object/action/properties found in an ACL file
+ // This map contains the legal combinations of object/action/properties
+ // found in an ACL file
static void loadValidationMap(objectMapPtr& map) {
if (!map.get()) return;
map->clear();
@@ -199,7 +320,6 @@ namespace acl {
propSetPtr p1(new propSet);
p1->insert(PROP_TYPE);
p1->insert(PROP_ALTERNATE);
- p1->insert(PROP_PASSIVE);
p1->insert(PROP_DURABLE);
propSetPtr p2(new propSet);
@@ -224,7 +344,6 @@ namespace acl {
propSetPtr p4(new propSet);
p4->insert(PROP_ALTERNATE);
- p4->insert(PROP_PASSIVE);
p4->insert(PROP_DURABLE);
p4->insert(PROP_EXCLUSIVE);
p4->insert(PROP_AUTODELETE);
@@ -260,21 +379,31 @@ namespace acl {
map->insert(objectPair(OBJ_METHOD, a4));
}
- static std::string propertyMapToString(const std::map<Property, std::string>* params) {
+ //
+ // properyMapToString
+ //
+ template <typename T>
+ static std::string propertyMapToString(
+ const std::map<T, std::string>* params)
+ {
std::ostringstream ss;
ss << "{";
if (params)
{
- for (propMapItr pMItr = params->begin(); pMItr != params->end(); pMItr++) {
- ss << " " << getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second;
+ for (typename std::map<T, std::string>::const_iterator
+ pMItr = params->begin(); pMItr != params->end(); pMItr++)
+ {
+ ss << " " << getPropertyStr((T) pMItr-> first)
+ << "=" << pMItr->second;
}
}
ss << " }";
return ss.str();
}
+
};
-
+
}} // namespace qpid::acl
#endif // QPID_ACLMODULE_ACL_H
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 9a1f4be468..5b531e4636 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -62,7 +62,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
- initialize(init)
+ initialize(init), detached(false)
{
std::stringstream title;
title << id << "_" << name;
@@ -85,11 +85,14 @@ Bridge::~Bridge()
void Bridge::create(Connection& c)
{
+ detached = false; // Reset detached in case we are recovering.
connState = &c;
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
SessionHandler& sessionHandler = c.getChannel(id);
+ sessionHandler.setDetachedCallback(
+ boost::bind(&Bridge::sessionDetached, shared_from_this()));
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
@@ -179,12 +182,6 @@ void Bridge::destroy()
listener(this);
}
-bool Bridge::isSessionReady() const
-{
- SessionHandler& sessionHandler = conn->getChannel(id);
- return sessionHandler.ready();
-}
-
void Bridge::setPersistenceId(uint64_t pId) const
{
persistenceId = pId;
@@ -336,4 +333,8 @@ const string& Bridge::getLocalTag() const
return link->getBroker()->getFederationTag();
}
+void Bridge::sessionDetached() {
+ detached = true;
+}
+
}}
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index b849b11ba8..32b9fd1781 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -33,6 +33,7 @@
#include "qmf/org/apache/qpid/broker/Bridge.h"
#include <boost/function.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <memory>
namespace qpid {
@@ -44,7 +45,10 @@ class Link;
class LinkRegistry;
class SessionHandler;
-class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
+class Bridge : public PersistableConfig,
+ public management::Manageable,
+ public Exchange::DynamicBridge,
+ public boost::enable_shared_from_this<Bridge>
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
@@ -63,7 +67,7 @@ public:
void destroy();
bool isDurable() { return args.i_durable; }
- bool isSessionReady() const;
+ bool isDetached() const { return detached; }
management::ManagementObject* GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
@@ -90,6 +94,9 @@ public:
const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
private:
+ // Callback when the bridge's session is detached.
+ void sessionDetached();
+
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
@@ -112,7 +119,7 @@ private:
ConnectionState* connState;
Connection* conn;
InitializeCallback initialize;
-
+ bool detached; // Set when session is detached.
bool resetProxy();
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 0fd31580f6..f20cce18a2 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -111,6 +111,7 @@ Broker::Options::Options(const std::string& name) :
maxConnections(500),
connectionBacklog(10),
enableMgmt(1),
+ mgmtPublish(1),
mgmtPubInterval(10),
queueCleanInterval(60*10),//10 minutes
auth(SaslAuthenticator::available()),
@@ -148,6 +149,7 @@ Broker::Options::Options(const std::string& name) :
("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
// FIXME aconway 2012-02-13: consistent treatment of values in SECONDS
@@ -213,7 +215,7 @@ Broker::Broker(const Broker::Options& conf) :
try {
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
- managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
+ managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish,
conf.mgmtPubInterval, this, conf.workerThreads + 3);
managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
@@ -228,6 +230,7 @@ Broker::Broker(const Broker::Options& conf) :
mgmtObject->set_maxConns(conf.maxConnections);
mgmtObject->set_connBacklog(conf.connectionBacklog);
mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
+ mgmtObject->set_mgmtPublish(conf.mgmtPublish);
mgmtObject->set_version(qpid::version);
if (dataDir.isEnabled())
mgmtObject->set_dataDir(dataDir.getPath());
@@ -885,7 +888,6 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, _FALSE));
params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
@@ -956,7 +958,6 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_TYPE, type));
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, _FALSE));
params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index cff38eecdd..135b9340f9 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -106,6 +106,7 @@ public:
int maxConnections;
int connectionBacklog;
bool enableMgmt;
+ bool mgmtPublish;
uint16_t mgmtPubInterval;
uint16_t queueCleanInterval;
bool auth;
@@ -206,7 +207,7 @@ public:
ConsumerFactories consumerFactories;
public:
- virtual ~Broker();
+ QPID_BROKER_EXTERN virtual ~Broker();
QPID_BROKER_EXTERN Broker(const Options& configuration);
static QPID_BROKER_EXTERN boost::intrusive_ptr<Broker> create(const Options& configuration);
@@ -218,16 +219,16 @@ public:
* port, which will be different if the configured port is
* 0.
*/
- virtual uint16_t getPort(const std::string& name) const;
+ QPID_BROKER_EXTERN virtual uint16_t getPort(const std::string& name) const;
/**
* Run the broker. Implements Runnable::run() so the broker
* can be run in a separate thread.
*/
- virtual void run();
+ QPID_BROKER_EXTERN virtual void run();
/** Shut down the broker */
- virtual void shutdown();
+ QPID_BROKER_EXTERN virtual void shutdown();
QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
void setAsyncStore(boost::shared_ptr<AsyncStore>& asyncStore);
@@ -248,14 +249,14 @@ public:
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
- management::ManagementObject* GetManagementObject (void) const;
- management::Manageable* GetVhostObject (void) const;
- management::Manageable::status_t ManagementMethod (uint32_t methodId,
- management::Args& args,
- std::string& text);
+ QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const;
+ QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
+ QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
+ uint32_t methodId, management::Args& args, std::string& text);
/** Add to the broker's protocolFactorys */
- void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
+ QPID_BROKER_EXTERN void registerProtocolFactory(
+ const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
/** Accept connections */
QPID_BROKER_EXTERN void accept();
@@ -273,15 +274,17 @@ public:
/** Move messages from one queue to another.
A zero quantity means to move all messages
*/
- uint32_t queueMoveMessages( const std::string& srcQueue,
- const std::string& destQueue,
- uint32_t qty,
- const qpid::types::Variant::Map& filter);
+ QPID_BROKER_EXTERN uint32_t queueMoveMessages(
+ const std::string& srcQueue,
+ const std::string& destQueue,
+ uint32_t qty,
+ const qpid::types::Variant::Map& filter);
- boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
+ QPID_BROKER_EXTERN boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(
+ const std::string& name = TCP_TRANSPORT) const;
/** Expose poller so plugins can register their descriptors. */
- boost::shared_ptr<sys::Poller> getPoller();
+ QPID_BROKER_EXTERN boost::shared_ptr<sys::Poller> getPoller();
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
@@ -291,7 +294,7 @@ public:
/** Timer for tasks that must be synchronized if we are in a cluster */
sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; }
- void setClusterTimer(std::auto_ptr<sys::Timer>);
+ QPID_BROKER_EXTERN void setClusterTimer(std::auto_ptr<sys::Timer>);
boost::function<std::vector<Url> ()> getKnownBrokers;
@@ -322,15 +325,14 @@ public:
* context.
*@return true if delivery of a message should be deferred.
*/
- boost::function<bool (const std::string& queue,
- const boost::intrusive_ptr<Message>& msg)> deferDelivery;
+ boost::function<bool (const std::string& queue, const boost::intrusive_ptr<Message>& msg)> deferDelivery;
bool isAuthenticating ( ) { return config.auth; }
bool isTimestamping() { return config.timestampRcvMsgs; }
typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
- std::pair<boost::shared_ptr<Queue>, bool> createQueue(
+ QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> createQueue(
const std::string& name,
bool durable,
bool autodelete,
@@ -339,30 +341,39 @@ public:
const qpid::framing::FieldTable& arguments,
const std::string& userId,
const std::string& connectionId);
- void deleteQueue(const std::string& name,
- const std::string& userId,
- const std::string& connectionId,
- QueueFunctor check = QueueFunctor());
- std::pair<Exchange::shared_ptr, bool> createExchange(
+
+ QPID_BROKER_EXTERN void deleteQueue(
+ const std::string& name,
+ const std::string& userId,
+ const std::string& connectionId,
+ QueueFunctor check = QueueFunctor());
+
+ QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> createExchange(
const std::string& name,
const std::string& type,
bool durable,
const std::string& alternateExchange,
const qpid::framing::FieldTable& args,
const std::string& userId, const std::string& connectionId);
- void deleteExchange(const std::string& name, const std::string& userId,
- const std::string& connectionId);
- void bind(const std::string& queue,
- const std::string& exchange,
- const std::string& key,
- const qpid::framing::FieldTable& arguments,
- const std::string& userId,
- const std::string& connectionId);
- void unbind(const std::string& queue,
- const std::string& exchange,
- const std::string& key,
- const std::string& userId,
- const std::string& connectionId);
+
+ QPID_BROKER_EXTERN void deleteExchange(
+ const std::string& name, const std::string& userId,
+ const std::string& connectionId);
+
+ QPID_BROKER_EXTERN void bind(
+ const std::string& queue,
+ const std::string& exchange,
+ const std::string& key,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId);
+
+ QPID_BROKER_EXTERN void unbind(
+ const std::string& queue,
+ const std::string& exchange,
+ const std::string& key,
+ const std::string& userId,
+ const std::string& connectionId);
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 1e6aab217c..5e339cec03 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -185,11 +185,13 @@ void Connection::recordFromServer(const framing::AMQFrame& frame)
// Don't record management stats in cluster-unsafe contexts
if (mgmtObject != 0 && isClusterSafe())
{
- mgmtObject->inc_framesToClient();
- mgmtObject->inc_bytesToClient(frame.encodedSize());
+ qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
+ cStats->framesToClient += 1;
+ cStats->bytesToClient += frame.encodedSize();
if (isMessage(frame.getMethod())) {
- mgmtObject->inc_msgsToClient();
+ cStats->msgsToClient += 1;
}
+ mgmtObject->statisticsUpdated();
}
}
@@ -198,11 +200,13 @@ void Connection::recordFromClient(const framing::AMQFrame& frame)
// Don't record management stats in cluster-unsafe contexts
if (mgmtObject != 0 && isClusterSafe())
{
- mgmtObject->inc_framesFromClient();
- mgmtObject->inc_bytesFromClient(frame.encodedSize());
+ qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
+ cStats->framesFromClient += 1;
+ cStats->bytesFromClient += frame.encodedSize();
if (isMessage(frame.getMethod())) {
- mgmtObject->inc_msgsFromClient();
+ cStats->msgsFromClient += 1;
}
+ mgmtObject->statisticsUpdated();
}
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 858ab6f7f4..1b8bd83139 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -113,15 +113,20 @@ class Connection : public sys::ConnectionInputHandler,
void requestIOProcessing (boost::function0<void>);
void recordFromServer (const framing::AMQFrame& frame);
void recordFromClient (const framing::AMQFrame& frame);
+
+ // gets for configured federation links
std::string getAuthMechanism();
std::string getAuthCredentials();
std::string getUsername();
std::string getPassword();
std::string getHost();
uint16_t getPort();
+
void notifyConnectionForced(const std::string& text);
void setUserId(const std::string& uid);
void raiseConnectEvent();
+
+ // credentials for connected client
const std::string& getUserId() const { return ConnectionState::getUserId(); }
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index f1d43c5cdb..6894324117 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -28,6 +28,7 @@
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ConnectionStartOkBody.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/SecurityLayer.h"
#include "qpid/broker/AclModule.h"
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index 05c5f00c57..2e25543308 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -35,7 +35,6 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/Exception.h"
-#include "qpid/broker/AclModule.h"
#include "qpid/sys/SecurityLayer.h"
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 5591539853..5d9aea7509 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -153,8 +153,9 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
return true;
}
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+void DirectExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
PreRoute pr(msg, this);
ConstBindingList b;
{
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index a6f9cf91af..833be52c1c 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -57,9 +57,7 @@ public:
const std::string& routingKey,
const qpid::framing::FieldTable* args);
virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue,
const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index febd547478..d482c2c327 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/StructHelper.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Timer.h"
#include "qpid/ptr_map.h"
@@ -55,7 +56,7 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon
bool DtxManager::prepare(const std::string& xid)
{
- QPID_LOG(debug, "preparing: " << xid);
+ QPID_LOG(debug, "preparing: " << convert(xid));
try {
return getWork(xid)->prepare();
} catch (DtxTimeoutException& e) {
@@ -66,7 +67,7 @@ bool DtxManager::prepare(const std::string& xid)
bool DtxManager::commit(const std::string& xid, bool onePhase)
{
- QPID_LOG(debug, "committing: " << xid);
+ QPID_LOG(debug, "committing: " << convert(xid));
try {
bool result = getWork(xid)->commit(onePhase);
remove(xid);
@@ -79,7 +80,7 @@ bool DtxManager::commit(const std::string& xid, bool onePhase)
void DtxManager::rollback(const std::string& xid)
{
- QPID_LOG(debug, "rolling back: " << xid);
+ QPID_LOG(debug, "rolling back: " << convert(xid));
try {
getWork(xid)->rollback();
remove(xid);
@@ -94,7 +95,7 @@ DtxWorkRecord* DtxManager::getWork(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
+ throw NotFoundException(QPID_MSG("Unrecognised xid " << convert(xid)));
}
return ptr_map_ptr(i);
}
@@ -109,7 +110,7 @@ void DtxManager::remove(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
+ throw NotFoundException(QPID_MSG("Unrecognised xid " << convert(xid)));
} else {
work.erase(i);
}
@@ -120,7 +121,7 @@ DtxWorkRecord* DtxManager::createWork(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i != work.end()) {
- throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
+ throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)"));
} else {
std::string ncxid = xid; // Work around const correctness problems in ptr_map.
return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
@@ -175,3 +176,19 @@ void DtxManager::setStore (TransactionalStore* _store)
{
store = _store;
}
+
+std::string DtxManager::convert(const qpid::framing::Xid& xid)
+{
+ qpid::framing::StructHelper helper;
+ std::string encoded;
+ helper.encode(xid, encoded);
+ return encoded;
+}
+
+qpid::framing::Xid DtxManager::convert(const std::string& xid)
+{
+ qpid::framing::StructHelper helper;
+ qpid::framing::Xid decoded;
+ helper.decode(decoded, xid);
+ return decoded;
+}
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index 11895695a3..6f03189f66 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -26,6 +26,7 @@
#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/Xid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/ptr_map.h"
@@ -74,6 +75,8 @@ public:
}
DtxWorkRecord* getWork(const std::string& xid);
bool exists(const std::string& xid);
+ static std::string convert(const framing::Xid& xid);
+ static framing::Xid convert(const std::string& xid);
};
}
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index a413fe418d..2c26fec49f 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/DtxWorkRecord.h"
+#include "qpid/broker/DtxManager.h"
#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
#include <boost/mem_fn.hpp>
@@ -73,7 +74,7 @@ bool DtxWorkRecord::commit(bool onePhase)
if (prepared) {
//already prepared i.e. 2pc
if (onePhase) {
- throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been prepared, one-phase option not valid!"));
}
store->commit(*txn);
@@ -84,7 +85,7 @@ bool DtxWorkRecord::commit(bool onePhase)
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
- throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
@@ -116,10 +117,10 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
{
Mutex::ScopedLock locker(lock);
if (expired) {
- throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out."));
+ throw DtxTimeoutException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has timed out."));
}
if (completed) {
- throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " has been completed!"));
}
work.push_back(ops);
}
@@ -133,7 +134,7 @@ bool DtxWorkRecord::check()
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
- throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " not completed!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << DtxManager::convert(xid) << " not completed!"));
} else if ((*i)->isRollbackOnly()) {
rolledback = true;
}
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index ecaa492903..8d20b0df81 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -32,7 +32,9 @@
#include "qpid/sys/ExceptionHolder.h"
#include <stdexcept>
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
@@ -135,20 +137,23 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
if (mgmtExchange != 0)
{
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
+ qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics();
+ uint64_t contentSize = msg.contentSize();
+
+ eStats->msgReceives += 1;
+ eStats->byteReceives += contentSize;
if (count == 0)
{
//QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
+ eStats->msgDrops += 1;
+ eStats->byteDrops += contentSize;
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsNoRoute();
}
else
{
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ eStats->msgRoutes += count;
+ eStats->byteRoutes += count * contentSize;
}
}
}
@@ -156,7 +161,7 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);
- route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders());
+ route(dmsg);
}
}
@@ -399,9 +404,12 @@ void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
bool Exchange::routeWithAlternate(Deliverable& msg)
{
- route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ route(msg);
if (!msg.delivered && alternate) {
- alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ alternate->route(msg);
}
return msg.delivered;
}
+
+}}
+
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 9179dd5c7c..7376f814ed 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -196,7 +196,7 @@ public:
virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg) = 0;
//PersistableExchange:
QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index fca77f7ddd..43d7268dfb 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/Link.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
@@ -58,6 +59,8 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
}else if (type == ManagementTopicExchange::typeName) {
exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
+ }else if (type == Link::exchangeTypeName) {
+ exchange = Link::linkExchangeFactory(name);
}else{
FunctionMap::iterator i = factory.find(type);
if (i == factory.end()) {
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index 90ef81b49e..27b705fbe5 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -54,7 +54,7 @@ class ExchangeRegistry{
bool durable,
const qpid::framing::FieldTable& args = framing::FieldTable());
QPID_BROKER_EXTERN void destroy(const std::string& name);
- Exchange::shared_ptr getDefault();
+ QPID_BROKER_EXTERN Exchange::shared_ptr getDefault();
/**
* Find the named exchange. Return 0 if not found.
@@ -75,7 +75,7 @@ class ExchangeRegistry{
/** Register an exchange instance.
*@return true if registered, false if exchange with same name is already registered.
*/
- bool registerExchange(const Exchange::shared_ptr&);
+ QPID_BROKER_EXTERN bool registerExchange(const Exchange::shared_ptr&);
QPID_BROKER_EXTERN void registerType(const std::string& type, FactoryFunction);
@@ -85,7 +85,7 @@ class ExchangeRegistry{
for (ExchangeMap::const_iterator i = exchanges.begin(); i != exchanges.end(); ++i)
f(i->second);
}
-
+
private:
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
typedef std::map<std::string, FactoryFunction > FunctionMap;
diff --git a/cpp/src/qpid/broker/Fairshare.cpp b/cpp/src/qpid/broker/Fairshare.cpp
index 313aa746f1..7cdad1a44f 100644
--- a/cpp/src/qpid/broker/Fairshare.cpp
+++ b/cpp/src/qpid/broker/Fairshare.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/Fairshare.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 5879fa0892..2bce99b6fe 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -101,7 +101,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
return true;
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+void FanOutExchange::route(Deliverable& msg)
{
PreRoute pr(msg, this);
doRoute(msg, bindings.snapshot());
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 1a7d486796..c979fdca25 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -54,9 +54,7 @@ class FanOutExchange : public virtual Exchange {
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 142c23f276..6648ae0422 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -191,8 +191,9 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+void HeadersExchange::route(Deliverable& msg)
{
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (!args) {
//can't match if there were no headers passed in
if (mgmtExchange != 0) {
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 3b939d6851..d10892b9cc 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -98,9 +98,7 @@ class HeadersExchange : public virtual Exchange {
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
diff --git a/cpp/src/qpid/broker/LegacyLVQ.cpp b/cpp/src/qpid/broker/LegacyLVQ.cpp
index 49c0a32c19..f1deddf4c8 100644
--- a/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -28,16 +28,26 @@ namespace broker {
LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
void LegacyLVQ::setNoBrowse(bool b)
-{
+{
noBrowse = b;
}
+bool LegacyLVQ::deleted(const QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.payload == message.payload) {
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.payload == message.payload) {
+ if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
message = i->second;
- erase(i);
return true;
} else {
return false;
@@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
}
const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
-{
+{
//add the new message into the original position of the replaced message
Ordering::iterator i = messages.find(original.position);
- i->second = update;
- i->second.position = original.position;
- return i->second;
+ if (i != messages.end()) {
+ i->second = update;
+ i->second.position = original.position;
+ return i->second;
+ } else {
+ QPID_LOG(error, "Failed to replace message at " << original.position);
+ return update;
+ }
}
void LegacyLVQ::removeIf(Predicate p)
diff --git a/cpp/src/qpid/broker/LegacyLVQ.h b/cpp/src/qpid/broker/LegacyLVQ.h
index 695e51131d..9355069f37 100644
--- a/cpp/src/qpid/broker/LegacyLVQ.h
+++ b/cpp/src/qpid/broker/LegacyLVQ.h
@@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap
{
public:
LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+ bool deleted(const QueuedMessage&);
bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool push(const QueuedMessage& added, QueuedMessage& removed);
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 4af1e6d6bd..f21c861149 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -31,6 +31,8 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/UrlArray.h"
namespace qpid {
namespace broker {
@@ -48,6 +50,13 @@ using std::stringstream;
using std::string;
namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+namespace {
+ const std::string FAILOVER_EXCHANGE("amq.failover");
+ const std::string FAILOVER_HEADER_KEY("amq.failover");
+}
+
+
struct LinkTimerTask : public sys::TimerTask {
LinkTimerTask(Link& l, sys::Timer& t)
: TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
@@ -65,6 +74,57 @@ struct LinkTimerTask : public sys::TimerTask {
sys::Timer& timer;
};
+
+
+/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange.
+ */
+class LinkExchange : public broker::Exchange
+{
+public:
+ LinkExchange(const std::string& name) : Exchange(name), link(0) {}
+ ~LinkExchange() {};
+ std::string getType() const { return Link::exchangeTypeName; }
+
+ // Exchange methods - set up to prevent binding/unbinding etc from clients!
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;}
+
+ // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
+ // and saving them should the Link need to reconnect.
+ void route(broker::Deliverable& msg)
+ {
+ if (!link) return;
+ const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+ framing::Array addresses;
+ if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
+ // convert the Array of addresses to a single Url container for used with setUrl():
+ std::vector<Url> urlVec;
+ Url urls;
+ urlVec = urlArrayToVector(addresses);
+ for(size_t i = 0; i < urlVec.size(); ++i)
+ urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
+ QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
+ link->setUrl(urls);
+ }
+ }
+
+ void setLink(Link *_link)
+ {
+ assert(!link);
+ link = _link;
+ }
+
+private:
+ Link *link;
+};
+
+
+boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name )
+{
+ return Exchange::shared_ptr(new LinkExchange(_name));
+}
+
Link::Link(LinkRegistry* _links,
MessageStore* _store,
const string& _host,
@@ -76,8 +136,9 @@ Link::Link(LinkRegistry* _links,
const string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
+ : links(_links), store(_store),
+ configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
+ host(_host), port(_port), transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -88,7 +149,8 @@ Link::Link(LinkRegistry* _links,
channelCounter(1),
connection(0),
agent(0),
- timerTask(new LinkTimerTask(*this, broker->getTimer()))
+ timerTask(new LinkTimerTask(*this, broker->getTimer())),
+ failoverChannel(0)
{
if (parent != 0 && broker != 0)
{
@@ -106,15 +168,26 @@ Link::Link(LinkRegistry* _links,
startConnectionLH();
}
broker->getTimer().add(timerTask);
+
+ stringstream _name;
+ _name << "qpid.link." << transport << ":" << host << ":" << port;
+ std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
+ exchangeTypeName);
+ failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(failoverExchange);
+ failoverExchange->setLink(this);
}
Link::~Link ()
{
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
+ if (state == STATE_OPERATIONAL && connection != 0) {
+ closeConnection("closed by management");
+ }
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
+
+ broker->getExchanges().destroy(failoverExchange->getName());
}
void Link::setStateLH (int newState)
@@ -180,11 +253,21 @@ void Link::established(Connection* c)
void Link::setUrl(const Url& u) {
+ QPID_LOG(info, "Setting remote broker failover addresses for link '" << getName() << "' to these urls: " << u);
Mutex::ScopedLock mutex(lock);
url = u;
reconnectNext = 0;
}
+
+namespace {
+ /** invoked when session used to subscribe to remote's amq.failover exchange detaches */
+ void sessionDetached(Link *link) {
+ QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName());
+ }
+}
+
+
void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
@@ -198,37 +281,74 @@ void Link::opened() {
reconnectNext = 0;
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
+
+ //
+ // attempt to subscribe to failover exchange for updates from remote
+ //
+
+ const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ failoverChannel = nextChannel();
+
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
+ failoverSession = queueName;
+ sessionHandler.attachAs(failoverSession);
+
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+ remoteBroker.getQueue().declare(queueName,
+ "", // alt-exchange
+ false, // passive
+ false, // durable
+ true, // exclusive
+ true, // auto-delete
+ FieldTable());
+ remoteBroker.getExchange().bind(queueName,
+ FAILOVER_EXCHANGE,
+ "", // no key
+ FieldTable());
+ remoteBroker.getMessage().subscribe(queueName,
+ failoverExchange->getName(),
+ 1, // implied-accept mode
+ 0, // pre-acquire mode
+ false, // exclusive
+ "", // resume-id
+ 0, // resume-ttl
+ FieldTable());
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
}
void Link::closed(int, std::string text)
{
- Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
-
- connection = 0;
+ bool isClosing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+ QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
- if (state == STATE_OPERATIONAL) {
- stringstream addr;
- addr << host << ":" << port;
- QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
- if (!hideManagement() && agent)
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
- }
+ connection = 0;
+ if (state == STATE_OPERATIONAL) {
+ stringstream addr;
+ addr << host << ":" << port;
+ if (!hideManagement() && agent)
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->closed();
- created.push_back(*i);
- }
- active.clear();
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ created.push_back(*i);
+ }
+ active.clear();
- if (state != STATE_FAILED && state != STATE_PASSIVE)
- {
- setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (text);
+ if (state != STATE_FAILED && state != STATE_PASSIVE)
+ {
+ setStateLH(STATE_WAITING);
+ if (!hideManagement())
+ mgmtObject->set_lastError (text);
+ }
}
-
- if (closing)
+ // Call destroy outside of the lock, don't want to be deleted with lock held.
+ if (isClosing)
destroy();
}
@@ -239,10 +359,8 @@ void Link::destroy ()
{
Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- if (connection)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
- connection = 0;
+ QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management");
+ closeConnection("closed by management");
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
@@ -263,7 +381,7 @@ void Link::destroy ()
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
(*i)->destroy();
toDelete.clear();
- links->destroy (host, port);
+ links->destroy (configuredHost, configuredPort);
}
void Link::add(Bridge::shared_ptr bridge)
@@ -311,7 +429,7 @@ void Link::ioThreadProcessing()
// check for bridge session errors and recover
if (!active.empty()) {
Bridges::iterator removed = std::remove_if(
- active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+ active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1));
for (Bridges::iterator i = removed; i != active.end(); ++i) {
Bridge::shared_ptr bridge = *i;
bridge->closed();
@@ -398,14 +516,14 @@ bool Link::hideManagement() const {
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
-
+ if (channelCounter >= framing::CHANNEL_MAX)
+ channelCounter = 1;
return channelCounter++;
}
void Link::notifyConnectionForced(const string text)
{
Mutex::ScopedLock mutex(lock);
-
setStateLH(STATE_FAILED);
if (!hideManagement())
mgmtObject->set_lastError(text);
@@ -418,7 +536,7 @@ void Link::setPersistenceId(uint64_t id) const
const string& Link::getName() const
{
- return host;
+ return configuredHost;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
@@ -444,9 +562,9 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
void Link::encode(Buffer& buffer) const
{
buffer.putShortString(string("link"));
- buffer.putShortString(host);
- buffer.putShort(port);
- buffer.putShortString(transport);
+ buffer.putShortString(configuredHost);
+ buffer.putShort(configuredPort);
+ buffer.putShortString(configuredTransport);
buffer.putOctet(durable ? 1 : 0);
buffer.putShortString(authMechanism);
buffer.putShortString(username);
@@ -455,10 +573,10 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return host.size() + 1 // short-string (host)
+ return configuredHost.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port
- + transport.size() + 1 // short-string(transport)
+ + configuredTransport.size() + 1 // short-string(transport)
+ 1 // durable
+ authMechanism.size() + 1
+ username.size() + 1
@@ -513,7 +631,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
}
std::pair<Bridge::shared_ptr, bool> result =
- links->declare (host, port, iargs.i_durable, iargs.i_src,
+ links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src,
iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
iargs.i_dynamic, iargs.i_sync);
@@ -542,4 +660,63 @@ void Link::setPassive(bool passive)
}
}
+
+/** utility to clean up connection resources correctly */
+void Link::closeConnection( const std::string& reason)
+{
+ if (connection != 0) {
+ // cancel our subscription to the failover exchange
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ if (sessionHandler.getSession()) {
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+ remoteBroker.getMessage().cancel(failoverExchange->getName());
+ remoteBroker.getSession().detach(failoverSession);
+ }
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
+ connection = 0;
+ }
+}
+
+/** returns the current remote's address, and connection state */
+bool Link::getRemoteAddress(qpid::Address& addr) const
+{
+ addr.protocol = transport;
+ addr.host = host;
+ addr.port = port;
+
+ return state == STATE_OPERATIONAL;
+}
+
+
+// FieldTable keys for internal state data
+namespace {
+ const std::string FAILOVER_ADDRESSES("failover-addresses");
+ const std::string FAILOVER_INDEX("failover-index");
+}
+
+void Link::getState(framing::FieldTable& state) const
+{
+ state.clear();
+ Mutex::ScopedLock mutex(lock);
+ if (!url.empty()) {
+ state.setString(FAILOVER_ADDRESSES, url.str());
+ state.setInt(FAILOVER_INDEX, reconnectNext);
+ }
+}
+
+void Link::setState(const framing::FieldTable& state)
+{
+ Mutex::ScopedLock mutex(lock);
+ if (state.isSet(FAILOVER_ADDRESSES)) {
+ Url failovers(state.getAsString(FAILOVER_ADDRESSES));
+ setUrl(failovers);
+ }
+ if (state.isSet(FAILOVER_INDEX)) {
+ reconnectNext = state.getAsInt(FAILOVER_INDEX);
+ }
+}
+
+
+const std::string Link::exchangeTypeName("qpid.LinkExchange");
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index 4085c3bfcf..a97fa48664 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -24,9 +24,11 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
@@ -45,15 +47,23 @@ namespace broker {
class LinkRegistry;
class Broker;
class Connection;
+class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
- sys::Mutex lock;
+ mutable sys::Mutex lock;
LinkRegistry* links;
MessageStore* store;
- std::string host;
- uint16_t port;
- std::string transport;
+
+ // these remain constant across failover - used to identify this link
+ const std::string configuredTransport;
+ const std::string configuredHost;
+ const uint16_t configuredPort;
+ // these reflect the current address of remote - will change during failover
+ std::string host;
+ uint16_t port;
+ std::string transport;
+
bool durable;
std::string authMechanism;
std::string username;
@@ -75,8 +85,10 @@ class Link : public PersistableConfig, public management::Manageable {
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
-
boost::intrusive_ptr<sys::TimerTask> timerTask;
+ boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
+ uint failoverChannel;
+ std::string failoverSession;
static const int STATE_WAITING = 1;
static const int STATE_CONNECTING = 2;
@@ -94,6 +106,14 @@ class Link : public PersistableConfig, public management::Manageable {
bool tryFailoverLH(); // Called during maintenance visit
bool hideManagement() const;
+ void established(Connection*); // Called when connection is create
+ void opened(); // Called when connection is open (after create)
+ void closed(int, std::string); // Called when connection goes away
+ void reconnectLH(const Address&); //called by LinkRegistry
+ void closeConnection(const std::string& reason);
+
+ friend class LinkRegistry; // to call established, opened, closed
+
public:
typedef boost::shared_ptr<Link> shared_ptr;
@@ -110,22 +130,25 @@ class Link : public PersistableConfig, public management::Manageable {
management::Manageable* parent = 0);
virtual ~Link();
- std::string getHost() { return host; }
- uint16_t getPort() { return port; }
- std::string getTransport() { return transport; }
+ /** these return the *configured* transport/host/port, which does not change over the
+ lifetime of the Link */
+ std::string getHost() const { return configuredHost; }
+ uint16_t getPort() const { return configuredPort; }
+ std::string getTransport() const { return configuredTransport; }
+
+ /** returns the current address of the remote, which may be different from the
+ configured transport/host/port due to failover. Returns true if connection is
+ active */
+ bool getRemoteAddress(qpid::Address& addr) const;
bool isDurable() { return durable; }
void maintenanceVisit ();
uint nextChannel();
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
- void setUrl(const Url&); // Set URL for reconnection.
- void established(Connection*); // Called when connection is create
- void opened(); // Called when connection is open (after create)
- void closed(int, std::string); // Called when connection goes away
- void reconnectLH(const Address&); //called by LinkRegistry
- void close(); // Close the link from within the broker.
+ QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
+ QPID_BROKER_EXTERN void close(); // Close the link from within the broker.
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
@@ -148,6 +171,13 @@ class Link : public PersistableConfig, public management::Manageable {
management::ManagementObject* GetManagementObject(void) const;
management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
+ // manage the exchange owned by this link
+ static const std::string exchangeTypeName;
+ static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
+
+ // replicate internal state of this Link for clustering
+ void getState(framing::FieldTable& state) const;
+ void setState(const framing::FieldTable& state);
};
}
}
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index bb602bb953..d89f220d1b 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -25,7 +25,9 @@
#include <iostream>
#include <boost/format.hpp>
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
using namespace qpid::sys;
using std::string;
using std::pair;
@@ -45,16 +47,15 @@ LinkRegistry::LinkRegistry () :
{
}
-namespace {
-struct ConnectionObserverImpl : public ConnectionObserver {
+class LinkRegistryConnectionObserver : public ConnectionObserver {
LinkRegistry& links;
- ConnectionObserverImpl(LinkRegistry& l) : links(l) {}
+ public:
+ LinkRegistryConnectionObserver(LinkRegistry& l) : links(l) {}
void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); }
void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); }
void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); }
};
-}
LinkRegistry::LinkRegistry (Broker* _broker) :
broker(_broker),
@@ -62,7 +63,7 @@ LinkRegistry::LinkRegistry (Broker* _broker) :
realm(broker->getOptions().realm)
{
broker->getConnectionObservers().add(
- boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this)));
+ boost::shared_ptr<ConnectionObserver>(new LinkRegistryConnectionObserver(*this)));
}
LinkRegistry::~LinkRegistry() {}
@@ -298,22 +299,29 @@ std::string LinkRegistry::getUsername(const std::string& key)
return link->getUsername();
}
+/** note: returns the current remote host (may be different from the host originally
+ configured for the Link due to failover) */
std::string LinkRegistry::getHost(const std::string& key)
{
- Link::shared_ptr link = findLink(key);
- if (!link)
- return string();
+ Link::shared_ptr link = findLink(key);
+ if (!link)
+ return string();
- return link->getHost();
+ qpid::Address addr;
+ link->getRemoteAddress(addr);
+ return addr.host;
}
+/** returns the current remote port (ditto above) */
uint16_t LinkRegistry::getPort(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return 0;
- return link->getPort();
+ qpid::Address addr;
+ link->getRemoteAddress(addr);
+ return addr.port;
}
std::string LinkRegistry::getPassword(const std::string& key)
@@ -368,3 +376,4 @@ void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f
for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
}
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 753f6bfe9e..8e9d2f4b0d 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -23,6 +23,7 @@
*/
#include <map>
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/Address.h"
@@ -56,43 +57,50 @@ namespace broker {
static std::string createKey(const Address& address);
static std::string createKey(const std::string& host, uint16_t port);
+ // Methods called by the connection observer.
+ void notifyConnection (const std::string& key, Connection* c);
+ void notifyOpened (const std::string& key);
+ void notifyClosed (const std::string& key);
+ void notifyConnectionForced (const std::string& key, const std::string& text);
+ friend class LinkRegistryConnectionObserver;
+
public:
- LinkRegistry (); // Only used in store tests
- LinkRegistry (Broker* _broker);
- ~LinkRegistry();
-
- std::pair<boost::shared_ptr<Link>, bool>
- declare(const std::string& host,
- uint16_t port,
- const std::string& transport,
- bool durable,
- const std::string& authMechanism,
- const std::string& username,
- const std::string& password);
-
- std::pair<Bridge::shared_ptr, bool>
- declare(const std::string& host,
- uint16_t port,
- bool durable,
- const std::string& src,
- const std::string& dest,
- const std::string& key,
- bool isQueue,
- bool isLocal,
- const std::string& id,
- const std::string& excludes,
- bool dynamic,
- uint16_t sync,
- Bridge::InitializeCallback=0
- );
-
- void destroy(const std::string& host, const uint16_t port);
-
- void destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key);
+ QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests
+ QPID_BROKER_EXTERN LinkRegistry (Broker* _broker);
+ QPID_BROKER_EXTERN ~LinkRegistry();
+
+ QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool>
+ declare(const std::string& host,
+ uint16_t port,
+ const std::string& transport,
+ bool durable,
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password);
+
+ QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool>
+ declare(const std::string& host,
+ uint16_t port,
+ bool durable,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
+ bool isQueue,
+ bool isLocal,
+ const std::string& id,
+ const std::string& excludes,
+ bool dynamic,
+ uint16_t sync,
+ Bridge::InitializeCallback=0
+ );
+
+ QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port);
+
+ QPID_BROKER_EXTERN void destroy(const std::string& host,
+ const uint16_t port,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
/**
* Register the manageable parent for declared queues
@@ -102,24 +110,20 @@ namespace broker {
/**
* Set the store to use. May only be called once.
*/
- void setStore (MessageStore*);
+ QPID_BROKER_EXTERN void setStore (MessageStore*);
/**
* Return the message store used.
*/
- MessageStore* getStore() const;
+ QPID_BROKER_EXTERN MessageStore* getStore() const;
- void notifyConnection (const std::string& key, Connection* c);
- void notifyOpened (const std::string& key);
- void notifyClosed (const std::string& key);
- void notifyConnectionForced (const std::string& key, const std::string& text);
- std::string getAuthMechanism (const std::string& key);
- std::string getAuthCredentials (const std::string& key);
- std::string getAuthIdentity (const std::string& key);
- std::string getUsername (const std::string& key);
- std::string getPassword (const std::string& key);
- std::string getHost (const std::string& key);
- uint16_t getPort (const std::string& key);
+ QPID_BROKER_EXTERN std::string getAuthMechanism (const std::string& key);
+ QPID_BROKER_EXTERN std::string getAuthCredentials (const std::string& key);
+ QPID_BROKER_EXTERN std::string getAuthIdentity (const std::string& key);
+ QPID_BROKER_EXTERN std::string getUsername (const std::string& key);
+ QPID_BROKER_EXTERN std::string getPassword (const std::string& key);
+ QPID_BROKER_EXTERN std::string getHost (const std::string& key);
+ QPID_BROKER_EXTERN uint16_t getPort (const std::string& key);
/**
* Called by links failing over to new address
@@ -132,13 +136,13 @@ namespace broker {
* updated but links won't actually establish connections and
* bridges won't therefore pull or push any messages.
*/
- void setPassive(bool);
- bool isPassive() { return passive; }
+ QPID_BROKER_EXTERN void setPassive(bool);
+ QPID_BROKER_EXTERN bool isPassive() { return passive; }
/** Iterate over each link in the registry. Used for cluster updates. */
- void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
+ QPID_BROKER_EXTERN void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
/** Iterate over each bridge in the registry. Used for cluster updates. */
- void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
+ QPID_BROKER_EXTERN void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
};
}
}
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index ae4503328a..40dfba39f4 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -131,6 +131,7 @@ uint32_t Message::getRequiredCredit()
void Message::encode(framing::Buffer& buffer) const
{
+ sys::Mutex::ScopedLock l(lock);
//encode method and header frames
EncodeFrame f1(buffer);
frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
@@ -142,6 +143,7 @@ void Message::encode(framing::Buffer& buffer) const
void Message::encodeContent(framing::Buffer& buffer) const
{
+ sys::Mutex::ScopedLock l(lock);
//encode the payload of each content frame
EncodeBody f2(buffer);
frames.map_if(f2, TypeFilter<CONTENT_BODY>());
@@ -154,11 +156,13 @@ uint32_t Message::encodedSize() const
uint32_t Message::encodedContentSize() const
{
+ sys::Mutex::ScopedLock l(lock);
return frames.getContentSize();
}
uint32_t Message::encodedHeaderSize() const
{
+ sys::Mutex::ScopedLock l(lock); // prevent modifications while computing size
//add up the size for all method and header frames in the frameset
SumFrameSize sum;
frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
@@ -218,8 +222,9 @@ void Message::releaseContent()
store->stage(pmsg);
staged = true;
}
- //ensure required credit is cached before content frames are released
+ //ensure required credit and size is cached before content frames are released
getRequiredCredit();
+ contentSize();
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
@@ -354,6 +359,7 @@ public:
AMQHeaderBody* Message::getHeaderBody()
{
+ // expects lock to be held
if (copyHeaderOnWrite) {
CloneHeaderBody f;
frames.map_if(f, TypeFilter<HEADER_BODY>());
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp
index 709d99876b..f70c996975 100644
--- a/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/cpp/src/qpid/broker/MessageDeque.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
+#include "assert.h"
namespace qpid {
namespace broker {
@@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing::SequenceNumber& position)
bool MessageDeque::deleted(const QueuedMessage& m)
{
size_t i = index(m.position);
- if (i < messages.size()) {
+ if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
messages[i].status = QueuedMessage::DELETED;
clean();
return true;
@@ -53,7 +54,7 @@ size_t MessageDeque::size()
return available;
}
-void MessageDeque::release(const QueuedMessage& message)
+QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
{
size_t i = index(message.position);
if (i < messages.size()) {
@@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedMessage& message)
if (head > i) head = i;
m.status = QueuedMessage::AVAILABLE;
++available;
+ return &messages[i];
}
} else {
+ assert(0);
QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
}
+ return 0;
}
+void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
+
bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (position < messages.front().position) return false;
@@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) {
}
} // namespace
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
+QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
//add padding to prevent gaps in sequence, which break the index
//calculation (needed for queue replication)
while (messages.size() && (added.position - messages.back().position) > 1)
@@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*
messages.back().status = QueuedMessage::AVAILABLE;
if (head >= messages.size()) head = messages.size() - 1;
++available;
- return false;//adding a message never causes one to be removed for deque
+ return &messages.back();
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
+ pushPtr(added);
+ return false; // adding a message never causes one to be removed for deque
}
void MessageDeque::updateAcquired(const QueuedMessage& acquired)
diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h
index bb5943b09b..9b53716d4e 100644
--- a/cpp/src/qpid/broker/MessageDeque.h
+++ b/cpp/src/qpid/broker/MessageDeque.h
@@ -48,6 +48,12 @@ class MessageDeque : public Messages
void foreach(Functor);
void removeIf(Predicate);
+ // For use by other Messages implementations that use MessageDeque as a FIFO index
+ // and keep pointers to its elements in their own indexing strctures.
+ void clean();
+ QueuedMessage* releasePtr(const QueuedMessage&);
+ QueuedMessage* pushPtr(const QueuedMessage& added);
+
private:
typedef std::deque<QueuedMessage> Deque;
Deque messages;
@@ -55,7 +61,6 @@ class MessageDeque : public Messages
size_t head;
size_t index(const framing::SequenceNumber&);
- void clean();
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
index 5f450cd556..15cd56a676 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -19,11 +19,13 @@
*
*/
+#include "qpid/broker/MessageGroupManager.h"
+
+#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/types/Variant.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/MessageGroupManager.h"
+#include "qpid/types/Variant.h"
using namespace qpid::broker;
@@ -43,9 +45,18 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+/** return an iterator to the message at position, or members.end() if not found */
+MessageGroupManager::GroupState::MessageFifo::iterator
+MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position)
+{
+ MessageState mState(position);
+ MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState);
+ return (found->position == position) ? found : members.end();
+}
+
void MessageGroupManager::unFree( const GroupState& state )
{
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ GroupFifo::iterator pos = freeGroups.find( state.members.front().position );
assert( pos != freeGroups.end() && pos->second == &state );
freeGroups.erase( pos );
}
@@ -60,8 +71,8 @@ void MessageGroupManager::disown( GroupState& state )
{
state.owner.clear();
assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
+ assert(freeGroups.find(state.members.front().position) == freeGroups.end());
+ freeGroups[state.members.front().position] = &state;
}
MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
@@ -106,7 +117,8 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm )
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- state.members.push_back(qm.position);
+ GroupState::MessageState mState(qm.position);
+ state.members.push_back(mState);
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
": added message to group id=" << state.group << " total=" << total );
@@ -123,7 +135,9 @@ void MessageGroupManager::acquired( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert(state.members.size()); // there are msgs present
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ m->acquired = true;
state.acquired += 1;
QPID_LOG( trace, "group queue " << qName <<
": acquired message in group id=" << state.group << " acquired=" << state.acquired );
@@ -137,6 +151,9 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
GroupState& state = findGroup(qm);
assert( state.acquired != 0 );
state.acquired -= 1;
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ m->acquired = false;
if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << state.group);
@@ -152,13 +169,17 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert( state.members.size() != 0 );
- assert( state.acquired != 0 );
- state.acquired -= 1;
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ if (m->acquired) {
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ }
- // likely to be at or near begin() if dequeued in order
+ // special case if qm is first (oldest) message in the group:
+ // may need to re-insert it back on the freeGroups list, as the index will change
bool reFreeNeeded = false;
- if (state.members.front() == qm.position) {
+ if (m == state.members.begin()) {
if (!state.owned()) {
// will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
// if on freelist, it is indexed by first member, which is about to be removed!
@@ -167,15 +188,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
}
state.members.pop_front();
} else {
- GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
- GroupState::PositionFifo::iterator end = state.members.end();
- while (pos != end) {
- if (*pos == qm.position) {
- state.members.erase(pos);
- break;
- }
- ++pos;
- }
+ state.members.erase(m);
}
uint32_t total = state.members.size();
@@ -220,11 +233,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
GroupState& group = findGroup(next);
if (!group.owned()) {
//TODO: make acquire more efficient when we already have the message in question
- if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
+ if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
- << "'s head message still pending. pos=" << group.members.front());
+ << "'s head message still pending. pos=" << group.members.front().position);
} else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
return true;
}
@@ -284,7 +297,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const
info[GROUP_TIMESTAMP] = 0;
if (g->second.members.size() != 0) {
QueuedMessage qm;
- if (messages.find(g->second.members.front(), qm) &&
+ if (messages.find(g->second.members.front().position, qm) &&
qm.payload &&
qm.payload->hasProperties<framing::DeliveryProperties>()) {
info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
@@ -353,6 +366,7 @@ namespace {
const std::string GROUP_OWNER("owner");
const std::string GROUP_ACQUIRED_CT("acquired-ct");
const std::string GROUP_POSITIONS("positions");
+ const std::string GROUP_ACQUIRED_MSGS("acquired-msgs");
const std::string GROUP_STATE("group-state");
}
@@ -371,10 +385,14 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
group.setString(GROUP_OWNER, g->second.owner);
group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
framing::Array positions(TYPE_CODE_UINT32);
- for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
- p != g->second.members.end(); ++p)
- positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+ for (GroupState::MessageFifo::const_iterator p = g->second.members.begin();
+ p != g->second.members.end(); ++p) {
+ positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position )));
+ acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired )));
+ }
group.setArray(GROUP_POSITIONS, positions);
+ group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
}
state.setArray(GROUP_STATE, groupState);
@@ -425,13 +443,25 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
qName << "\": position encoding error!");
return;
}
+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+ ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
+ if (!ok || positions.count() != acquiredMsgs.count()) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": acquired flag encoding error!");
+ return;
+ }
+
+ Array::const_iterator a = acquiredMsgs.begin();
+ for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) {
+ GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>());
+ mState.acquired = (*a++)->getIntegerValue<bool>();
+ state.members.push_back(mState);
+ }
- for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
- state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
messageGroups[state.group] = state;
if (!state.owned()) {
assert(state.members.size());
- freeGroups[state.members.front()] = &messageGroups[state.group];
+ freeGroups[state.members.front().position] = &messageGroups[state.group];
}
}
diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h
index f4bffc4760..2dd97ea2ff 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/cpp/src/qpid/broker/MessageGroupManager.h
@@ -28,11 +28,14 @@
#include "qpid/broker/MessageDistributor.h"
#include "qpid/sys/unordered_map.h"
+#include <deque>
+
namespace qpid {
namespace broker {
class QueueObserver;
class MessageDistributor;
+class Messages;
class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor
{
@@ -45,19 +48,29 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
struct GroupState {
// note: update getState()/setState() when changing this object's state implementation
- typedef std::deque<framing::SequenceNumber> PositionFifo;
+
+ // track which messages are in this group, and if they have been acquired
+ struct MessageState {
+ qpid::framing::SequenceNumber position;
+ bool acquired;
+ MessageState() : acquired(false) {}
+ MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {}
+ bool operator<(const MessageState& b) const { return position < b.position; }
+ };
+ typedef std::deque<MessageState> MessageFifo;
std::string group; // group identifier
std::string owner; // consumer with outstanding acquired messages
uint32_t acquired; // count of outstanding acquired messages
- PositionFifo members; // msgs belonging to this group
+ MessageFifo members; // msgs belonging to this group, in enqueue order
GroupState() : acquired(0) {}
bool owned() const {return !owner.empty();}
+ MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &);
};
typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
- typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+ typedef std::map<qpid::framing::SequenceNumber, struct GroupState *> GroupFifo;
GroupMap messageGroups; // index: group name
GroupFifo freeGroups; // ordered by oldest free msg
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp
index 048df45434..9b164d4e5c 100644
--- a/cpp/src/qpid/broker/MessageMap.cpp
+++ b/cpp/src/qpid/broker/MessageMap.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -27,7 +28,16 @@ namespace {
const std::string EMPTY;
}
-bool MessageMap::deleted(const QueuedMessage&) { return true; }
+bool MessageMap::deleted(const QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end()) {
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
std::string MessageMap::getKey(const QueuedMessage& message)
{
@@ -38,30 +48,32 @@ std::string MessageMap::getKey(const QueuedMessage& message)
size_t MessageMap::size()
{
- return messages.size();
+ size_t count(0);
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+ }
+ return count;
}
bool MessageMap::empty()
{
- return messages.empty();
+ return size() == 0;//TODO: more efficient implementation
}
void MessageMap::release(const QueuedMessage& message)
{
- std::string key = getKey(message);
- Index::iterator i = index.find(key);
- if (i == index.end()) {
- index[key] = message;
- messages[message.position] = message;
- } //else message has already been replaced
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+ i->second.status = QueuedMessage::AVAILABLE;
+ }
}
bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
message = i->second;
- erase(i);
return true;
} else {
return false;
@@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
message = i->second;
return true;
} else {
@@ -79,10 +91,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end()) {
+ if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
message = i->second;
return true;
} else {
@@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::consume(QueuedMessage& message)
{
- Ordering::iterator i = messages.begin();
- if (i != messages.end()) {
- message = i->second;
- erase(i);
- return true;
- } else {
- return false;
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
+ message = i->second;
+ return true;
+ }
}
+ return false;
}
const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
@@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
if (result.second) {
//there was no previous message for this key; nothing needs to
//be removed, just add the message into its correct position
- messages[added.position] = added;
+ QueuedMessage& a = messages[added.position];
+ a = added;
+ a.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Added message at " << a.position);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
+ result.first->second.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
return true;
}
}
@@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(i->second);
+ if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
}
}
void MessageMap::removeIf(Predicate p)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
- if (p(i->second)) {
- erase(i);
+ for (Ordering::iterator i = messages.begin(); i != messages.end();) {
+ if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
+ index.erase(getKey(i->second));
+ //Note: Removing from messages means that the subsequent
+ //call to deleted() for the same message will return
+ //false. At present that is not a problem. If this were
+ //changed to hold onto the message until dequeued
+ //(e.g. with REMOVED state), then the erase() below would
+ //need to take that into account.
+ messages.erase(i++);
+ } else {
+ ++i;
}
}
}
diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h
index d1b8217f9b..a668450250 100644
--- a/cpp/src/qpid/broker/MessageMap.h
+++ b/cpp/src/qpid/broker/MessageMap.h
@@ -43,7 +43,7 @@ class MessageMap : public Messages
size_t size();
bool empty();
- bool deleted(const QueuedMessage&);
+ virtual bool deleted(const QueuedMessage&);
void release(const QueuedMessage&);
virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp
index d807ef22b1..ab5ec7235a 100644
--- a/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -3,13 +3,13 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownersip. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,96 +22,87 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
#include <cmath>
namespace qpid {
namespace broker {
-PriorityQueue::PriorityQueue(int l) :
+PriorityQueue::PriorityQueue(int l) :
levels(l),
messages(levels, Deque()),
frontLevel(0), haveFront(false), cached(false) {}
-bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
+bool PriorityQueue::deleted(const QueuedMessage& qm) {
+ bool deleted = fifo.deleted(qm);
+ if (deleted) erase(qm);
+ return deleted;
+}
size_t PriorityQueue::size()
{
- size_t total(0);
- for (int i = 0; i < levels; ++i) {
- total += messages[i].size();
- }
- return total;
+ return fifo.size();
+}
+
+namespace {
+bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
}
void PriorityQueue::release(const QueuedMessage& message)
{
- uint p = getPriorityLevel(message);
- messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
- clearCache();
+ QueuedMessage* qm = fifo.releasePtr(message);
+ if (qm) {
+ uint p = getPriorityLevel(message);
+ messages[p].insert(
+ lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
+ clearCache();
+ }
}
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
-{
- QueuedMessage comp;
- comp.position = position;
- for (int i = 0; i < levels; ++i) {
- if (!messages[i].empty()) {
- unsigned long diff = position.getValue() - messages[i].front().position.getValue();
- long maxEnd = diff < messages[i].size() ? diff : messages[i].size();
- Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
- if (l != messages[i].end() && l->position == position) {
- message = *l;
- if (remove) {
- messages[i].erase(l);
- clearCache();
- }
- return true;
- }
+
+void PriorityQueue::erase(const QueuedMessage& qm) {
+ size_t i = getPriorityLevel(qm);
+ if (!messages[i].empty()) {
+ long diff = qm.position.getValue() - messages[i].front()->position.getValue();
+ if (diff < 0) return;
+ long maxEnd = std::min(size_t(diff), messages[i].size());
+ QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
+ Deque::iterator l =
+ lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
+ if (l != messages[i].end() && (*l)->position == qm.position) {
+ messages[i].erase(l);
+ clearCache();
+ return;
}
}
- return false;
}
bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return find(position, message, true);
+ bool acquired = fifo.acquire(position, message);
+ if (acquired) erase(message); // No longer available
+ return acquired;
}
bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return find(position, message, false);
+ return fifo.find(position, message);
}
-bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool PriorityQueue::browse(
+ const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- QueuedMessage match;
- match.position = position+1;
- Deque::iterator lowest;
- bool found = false;
- for (int i = 0; i < levels; ++i) {
- Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match);
- if (m != messages[i].end()) {
- if (m->position == match.position) {
- message = *m;
- return true;
- } else if (!found || m->position < lowest->position) {
- lowest = m;
- found = true;
- }
- }
- }
- if (found) {
- message = *lowest;
- }
- return found;
+ return fifo.browse(position, message, unacquired);
}
bool PriorityQueue::consume(QueuedMessage& message)
{
if (checkFront()) {
- message = messages[frontLevel].front();
+ QueuedMessage* pm = messages[frontLevel].front();
messages[frontLevel].pop_front();
clearCache();
+ pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
+ message = *pm;
return true;
} else {
return false;
@@ -120,23 +111,27 @@ bool PriorityQueue::consume(QueuedMessage& message)
bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
- messages[getPriorityLevel(added)].push_back(added);
+ QueuedMessage* qmp = fifo.pushPtr(added);
+ messages[getPriorityLevel(added)].push_back(qmp);
clearCache();
- return false;//adding a message never causes one to be removed for deque
+ return false; // Adding a message never causes one to be removed for deque
+}
+
+void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
+ fifo.updateAcquired(acquired);
}
void PriorityQueue::foreach(Functor f)
{
- for (int i = 0; i < levels; ++i) {
- std::for_each(messages[i].begin(), messages[i].end(), f);
- }
+ fifo.foreach(f);
}
void PriorityQueue::removeIf(Predicate p)
{
for (int priority = 0; priority < levels; ++priority) {
for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
- if (p(*i)) {
+ if (p(**i)) {
+ (*i)->status = QueuedMessage::DELETED; // Updates fifo index
i = messages[priority].erase(i);
clearCache();
} else {
@@ -144,6 +139,7 @@ void PriorityQueue::removeIf(Predicate p)
}
}
}
+ fifo.clean();
}
uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h
index 67c31468d2..8628745db1 100644
--- a/cpp/src/qpid/broker/PriorityQueue.h
+++ b/cpp/src/qpid/broker/PriorityQueue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDeque.h"
#include "qpid/sys/IntegerTypes.h"
#include <deque>
#include <vector>
@@ -32,7 +32,10 @@ namespace broker {
/**
* Basic priority queue with a configurable number of recognised
* priority levels. This is implemented as a separate deque per
- * priority level. Browsing is FIFO not priority order.
+ * priority level.
+ *
+ * Browsing is FIFO not priority order. There is a MessageDeque
+ * for fast browsing.
*/
class PriorityQueue : public Messages
{
@@ -48,23 +51,31 @@ class PriorityQueue : public Messages
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
-
+ void updateAcquired(const QueuedMessage& acquired);
void foreach(Functor);
void removeIf(Predicate);
+
static uint getPriority(const QueuedMessage&);
+
protected:
- typedef std::deque<QueuedMessage> Deque;
+ typedef std::deque<QueuedMessage*> Deque;
typedef std::vector<Deque> PriorityLevels;
virtual bool findFrontLevel(uint& p, PriorityLevels&);
const int levels;
+
private:
+ /** Available messages separated by priority and sorted in priority order.
+ * Holds pointers to the QueuedMessages in fifo
+ */
PriorityLevels messages;
+ /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */
+ MessageDeque fifo;
uint frontLevel;
bool haveFront;
bool cached;
-
- bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+
+ void erase(const QueuedMessage&);
uint getPriorityLevel(const QueuedMessage&) const;
void clearCache();
bool checkFront();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 015957927f..e7305c021d 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -19,8 +19,9 @@
*
*/
-#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
+
+#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Fairshare.h"
@@ -41,6 +42,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
@@ -56,7 +58,9 @@
#include <boost/intrusive_ptr.hpp>
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::management::ManagementAgent;
@@ -88,8 +92,57 @@ const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
+
+inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
+ _qmf::Queue* mgmtObject,
+ _qmf::Broker* brokerMgmtObject)
+{
+ if (mgmtObject != 0) {
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+
+ uint64_t contentSize = msg->contentSize();
+ qStats->msgTotalEnqueues +=1;
+ bStats->msgTotalEnqueues += 1;
+ qStats->byteTotalEnqueues += contentSize;
+ bStats->byteTotalEnqueues += contentSize;
+ if (msg->isPersistent ()) {
+ qStats->msgPersistEnqueues += 1;
+ bStats->msgPersistEnqueues += 1;
+ qStats->bytePersistEnqueues += contentSize;
+ bStats->bytePersistEnqueues += contentSize;
+ }
+ mgmtObject->statisticsUpdated();
+ brokerMgmtObject->statisticsUpdated();
+ }
+}
+
+inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
+ _qmf::Queue* mgmtObject,
+ _qmf::Broker* brokerMgmtObject)
+{
+ if (mgmtObject != 0){
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ uint64_t contentSize = msg->contentSize();
+
+ qStats->msgTotalDequeues += 1;
+ bStats->msgTotalDequeues += 1;
+ qStats->byteTotalDequeues += contentSize;
+ bStats->byteTotalDequeues += contentSize;
+ if (msg->isPersistent ()){
+ qStats->msgPersistDequeues += 1;
+ bStats->msgPersistDequeues += 1;
+ qStats->bytePersistDequeues += contentSize;
+ bStats->bytePersistDequeues += contentSize;
+ }
+ mgmtObject->statisticsUpdated();
+ brokerMgmtObject->statisticsUpdated();
+ }
}
+} // namespace
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
@@ -101,6 +154,7 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
consumerCount(0),
+ browserCount(0),
exclusive(0),
noLocal(false),
persistLastNode(false),
@@ -166,7 +220,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
- alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
+ alternateExchange->route(deliverable);
}
} else if (isLocal(msg)) {
//drop message
@@ -183,11 +237,16 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
{
+ Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->recoverEnqueued(msg);
}
-void Queue::recover(boost::intrusive_ptr<Message>& msg){
- if (policy.get()) policy->recoverEnqueued(msg);
+void Queue::recover(boost::intrusive_ptr<Message>& msg)
+{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->recoverEnqueued(msg);
+ }
push(msg, true);
if (store){
@@ -209,11 +268,16 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject != 0){
- mgmtObject->inc_msgTxnEnqueues ();
- mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ const uint64_t contentSize = msg->contentSize();
+ qStats->msgTxnEnqueues += 1;
+ qStats->byteTxnEnqueues += contentSize;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgTxnEnqueues ();
- brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgTxnEnqueues += 1;
+ bStats->byteTxnEnqueues += contentSize;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -222,7 +286,6 @@ void Queue::requeue(const QueuedMessage& msg){
assertClusterSafe();
QueueListeners::NotificationSet copy;
{
- Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
if (deleted) {
//
@@ -238,10 +301,20 @@ void Queue::requeue(const QueuedMessage& msg){
if (brokerMgmtObject)
brokerMgmtObject->inc_abandoned();
}
- mgntDeqStats(msg.payload);
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
} else {
- messages->release(msg);
- listeners.populate(copy);
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->release(msg);
+ observeRequeue(msg, locker);
+ listeners.populate(copy);
+ }
+
+ if (mgmtObject) {
+ mgmtObject->inc_releases();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_releases();
+ }
// for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
@@ -251,7 +324,6 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
- observeRequeue(msg, locker);
}
}
copy.notify();
@@ -259,10 +331,9 @@ void Queue::requeue(const QueuedMessage& msg){
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
- Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (acquire(position, message, locker)) {
+ if (acquire(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
} else {
@@ -273,17 +344,20 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
{
- Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
-
- if (!allocator->allocate( consumer, msg )) {
+ bool ok;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ ok = allocator->allocate( consumer, msg );
+ }
+ if (!ok) {
QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
return false;
}
QueuedMessage copy(msg);
- if (acquire( msg.position, copy, locker)) {
+ if (acquire( msg.position, copy)) {
QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
return true;
}
@@ -325,59 +399,73 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
while (true) {
- Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
- if (allocator->nextConsumableMessage(c, msg)) {
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->setPosition(msg.position);
- dequeue(0, msg);
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl();
- }
+ bool found;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ found = allocator->nextConsumableMessage(c, msg);
+ if (!found) listeners.addListener(c);
+ }
+ if (!found) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ return NO_MESSAGES;
+ }
- continue;
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->setPosition(msg.position);
+ dequeue(0, msg);
+ if (mgmtObject) {
+ mgmtObject->inc_discardsTtl();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl();
}
+ continue;
+ }
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ {
+ Mutex::ScopedLock locker(messageLock);
bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
(void) ok; assert(ok);
observeAcquire(msg, locker);
- m = msg;
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- messages->release(msg);
- return CANT_CONSUME;
}
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+ m = msg;
+ return CONSUMED;
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- messages->release(msg);
- return CANT_CONSUME;
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
}
} else {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- listeners.addListener(c);
- return NO_MESSAGES;
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
}
+
+ Mutex::ScopedLock locker(messageLock);
+ messages->release(msg);
+ return CANT_CONSUME;
}
}
bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
while (true) {
- Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
-
- if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
+ bool found;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ found = allocator->nextBrowsableMessage(c, msg);
+ if (!found) listeners.addListener(c);
+ }
+ if (!found) { // no next available
QPID_LOG(debug, "No browsable messages available for consumer " <<
c->getName() << " on queue '" << name << "'");
- listeners.addListener(c);
return false;
}
@@ -435,60 +523,67 @@ bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
{
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ Mutex::ScopedLock locker(messageLock);
+ // NOTE: consumerCount is actually a count of all
+ // subscriptions, both acquiring and non-acquiring (browsers).
+ // Check for exclusivity of acquiring consumers.
+ size_t acquiringConsumers = consumerCount - browserCount;
+ if (c->preAcquires()) {
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName()
+ << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(acquiringConsumers) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName()
+ << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
}
+ else
+ browserCount++;
consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
}
+ observeConsumerAdd(*c, locker);
}
- Mutex::ScopedLock locker(messageLock);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerAdded(*c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
- }
- }
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
{
- Mutex::ScopedLock locker(consumerLock);
+ Mutex::ScopedLock locker(messageLock);
consumerCount--;
+ if (!c->preAcquires()) browserCount--;
if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
- }
- Mutex::ScopedLock locker(messageLock);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerRemoved(*c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
- }
+ observeConsumerRemove(*c, locker);
}
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
}
QueuedMessage Queue::get(){
- Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->consume(msg))
- observeAcquire(msg, locker);
+ bool ok;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ ok = messages->consume(msg);
+ if (ok) observeAcquire(msg, locker);
+ }
+
+ if (ok && mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+
return msg;
}
@@ -520,22 +615,26 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- //
- // Report the count of discarded-by-ttl messages
- //
- if (mgmtObject && !expired.empty()) {
- mgmtObject->inc_discardsTtl(expired.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl(expired.size());
- }
+ if (!expired.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(expired.size());
+ mgmtObject->inc_discardsTtl(expired.size());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(expired.size());
+ brokerMgmtObject->inc_discardsTtl(expired.size());
+ }
+ }
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
+ for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+ i != expired.end(); ++i) {
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf() call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
}
- dequeue( 0, *i );
}
}
}
@@ -661,32 +760,46 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
Collector c(*mf.get(), purge_request);
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ }
- if (mgmtObject && !c.matches.empty()) {
- if (dest.get()) {
- mgmtObject->inc_reroutes(c.matches.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_reroutes(c.matches.size());
- } else {
- mgmtObject->inc_discardsPurge(c.matches.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsPurge(c.matches.size());
+ if (!c.matches.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(c.matches.size());
+ if (dest.get()) {
+ mgmtObject->inc_reroutes(c.matches.size());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(c.matches.size());
+ brokerMgmtObject->inc_reroutes(c.matches.size());
+ }
+ } else {
+ mgmtObject->inc_discardsPurge(c.matches.size());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(c.matches.size());
+ brokerMgmtObject->inc_discardsPurge(c.matches.size());
+ }
+ }
}
- }
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
- // Update observers and message state:
- observeAcquire(*qmsg, locker);
- dequeue(0, *qmsg);
- QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
- // now reroute if necessary
- if (dest.get()) {
- assert(qmsg->payload);
- DeliverableMessage dmsg(qmsg->payload);
- dest->routeWithAlternate(dmsg);
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*qmsg, locker);
+ }
+ dequeue(0, *qmsg);
+ QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
+ // now reroute if necessary
+ if (dest.get()) {
+ assert(qmsg->payload);
+ DeliverableMessage dmsg(qmsg->payload);
+ dest->routeWithAlternate(dmsg);
+ }
}
}
return c.matches.size();
@@ -698,27 +811,51 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
Collector c(*mf.get(), qty);
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ }
+
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
+ if (!c.matches.empty()) {
// Update observers and message state:
- observeAcquire(*qmsg, locker);
- dequeue(0, *qmsg);
- // and move to destination Queue.
- assert(qmsg->payload);
- destq->deliver(qmsg->payload);
+
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires(c.matches.size());
+ }
+
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*qmsg, locker);
+ }
+ dequeue(0, *qmsg);
+ // and move to destination Queue.
+ assert(qmsg->payload);
+ destq->deliver(qmsg->payload);
+ }
}
return c.matches.size();
}
/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
- const Mutex::ScopedLock& locker)
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg)
{
- if (messages->acquire(position, msg)) {
- observeAcquire(msg, locker);
+ bool ok;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ ok = messages->acquire(position, msg);
+ if (ok) observeAcquire(msg, locker);
+ }
+ if (ok) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
++dequeueSincePurge;
return true;
}
@@ -728,35 +865,43 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- QueuedMessage removed;
+ QueuedMessage removed, qm(this, msg);
bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
- QueuedMessage qm(this, msg, ++sequence);
- if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
-
- dequeueRequired = messages->push(qm, removed);
- if (dequeueRequired) {
+ qm.position = ++sequence;
+ if (messages->push(qm, removed)) {
+ dequeueRequired = true;
observeAcquire(removed, locker);
- if (mgmtObject) {
- mgmtObject->inc_discardsLvq();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsLvq();
- }
}
- listeners.populate(copy);
observeEnqueue(qm, locker);
+ if (policy.get()) {
+ policy->enqueued(qm);
+ }
+ listeners.populate(copy);
}
- copy.notify();
+ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position);
+
+ mgntEnqStats(msg, mgmtObject, brokerMgmtObject);
+
if (dequeueRequired) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ mgmtObject->inc_discardsLvq();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ brokerMgmtObject->inc_discardsLvq();
+ }
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
+ Mutex::ScopedLock locker(messageLock);
pendingDequeues.push_back(removed);
} else {
dequeue(0, removed);
}
}
+ copy.notify();
}
void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
@@ -767,8 +912,8 @@ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
/** function only provided for unit tests, or code not in critical message path */
uint32_t Queue::getEnqueueCompleteMessageCount() const
{
- Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
+ Mutex::ScopedLock locker(messageLock);
messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
return count;
}
@@ -781,13 +926,13 @@ uint32_t Queue::getMessageCount() const
uint32_t Queue::getConsumerCount() const
{
- Mutex::ScopedLock locker(consumerLock);
+ Mutex::ScopedLock locker(messageLock);
return consumerCount;
}
bool Queue::canAutoDelete() const
{
- Mutex::ScopedLock locker(consumerLock);
+ Mutex::ScopedLock locker(messageLock);
return autodelete && !consumerCount && !owner;
}
@@ -894,14 +1039,20 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
-
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
if (!ctxt) {
+ if (policy.get()) policy->dequeued(msg);
+ messages->deleted(msg);
observeDequeue(msg, locker);
}
}
+
+ if (!ctxt) {
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
+ }
+
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -918,14 +1069,24 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
- Mutex::ScopedLock locker(messageLock);
- observeDequeue(msg, locker);
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->dequeued(msg);
+ messages->deleted(msg);
+ observeDequeue(msg, locker);
+ }
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
if (mgmtObject != 0) {
- mgmtObject->inc_msgTxnDequeues();
- mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ const uint64_t contentSize = msg.payload->contentSize();
+ qStats->msgTxnDequeues += 1;
+ qStats->byteTxnDequeues += contentSize;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgTxnDequeues();
- brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgTxnDequeues += 1;
+ bStats->byteTxnDequeues += contentSize;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -934,10 +1095,20 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
* Removes the first (oldest) message from the in-memory delivery queue as well dequeing
* it from the logical (and persistent if applicable) queue
*/
-bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
+bool Queue::popAndDequeue(QueuedMessage& msg)
{
- if (messages->consume(msg)) {
- observeAcquire(msg, locker);
+ bool popped;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ popped = messages->consume(msg);
+ if (popped) observeAcquire(msg, locker);
+ }
+ if (popped) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
dequeue(0, msg);
return true;
} else {
@@ -947,13 +1118,10 @@ bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
/**
* Updates policy and management when a message has been dequeued,
- * expects messageLock to be held
+ * Requires messageLock be held by caller.
*/
-void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
- mgntDeqStats(msg.payload);
- if (policy.get()) policy->dequeued(msg);
- messages->deleted(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -963,17 +1131,11 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
}
}
-/** updates queue observers when a message has become unavailable for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become unavailable for transfer.
+ * Requires messageLock be held by caller.
*/
-void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
-
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->acquired(msg);
@@ -983,17 +1145,11 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
}
}
-/** updates queue observers when a message has become re-available for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become re-available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
- if (mgmtObject) {
- mgmtObject->inc_releases();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_releases();
- }
-
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->requeued(msg);
@@ -1003,6 +1159,33 @@ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
}
}
+/** updates queue observers when a new consumer has subscribed to this queue.
+ */
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerAdded(c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
+/** updates queue observers when a consumer has unsubscribed from this queue.
+ */
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerRemoved(c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
+
void Queue::create(const FieldTable& _settings)
{
settings = _settings;
@@ -1150,23 +1333,21 @@ void Queue::configureImpl(const FieldTable& _settings)
void Queue::destroyed()
{
unbind(broker->getExchanges());
- {
- Mutex::ScopedLock locker(messageLock);
- QueuedMessage m;
- while(popAndDequeue(m, locker)) {
- DeliverableMessage msg(m.payload);
- if (alternateExchange.get()) {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandonedViaAlt();
- alternateExchange->routeWithAlternate(msg);
- } else {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandoned();
- }
+
+ QueuedMessage m;
+ while(popAndDequeue(m)) {
+ DeliverableMessage msg(m.payload);
+ if (alternateExchange.get()) {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ alternateExchange->routeWithAlternate(msg);
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
}
- if (alternateExchange.get())
- alternateExchange->decAlternateUsers();
}
+ if (alternateExchange.get())
+ alternateExchange->decAlternateUsers();
if (store) {
barrier.destroy();
@@ -1177,7 +1358,7 @@ void Queue::destroyed()
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock lock(messageLock);
observers.clear();
}
}
@@ -1187,8 +1368,8 @@ void Queue::notifyDeleted()
QueueListeners::ListenerSet set;
{
Mutex::ScopedLock locker(messageLock);
- listeners.snapshot(set);
deleted = true;
+ listeners.snapshot(set);
}
set.notifyAll();
}
@@ -1206,6 +1387,7 @@ void Queue::unbind(ExchangeRegistry& exchanges)
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
{
+ Mutex::ScopedLock locker(messageLock);
policy = _policy;
if (policy.get())
policy->setQueue(this);
@@ -1213,6 +1395,7 @@ void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
const QueuePolicy* Queue::getPolicy()
{
+ Mutex::ScopedLock locker(messageLock);
return policy.get();
}
@@ -1302,7 +1485,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask
Queue::shared_ptr queue;
AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
void fire()
{
@@ -1388,11 +1571,15 @@ void Queue::countRejected() const
void Queue::countFlowedToDisk(uint64_t size) const
{
if (mgmtObject) {
- mgmtObject->inc_msgFtdEnqueues();
- mgmtObject->inc_byteFtdEnqueues(size);
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qStats->msgFtdEnqueues += 1;
+ qStats->byteFtdEnqueues += size;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgFtdEnqueues();
- brokerMgmtObject->inc_byteFtdEnqueues(size);
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgFtdEnqueues += 1;
+ bStats->byteFtdEnqueues += size;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -1400,11 +1587,15 @@ void Queue::countFlowedToDisk(uint64_t size) const
void Queue::countLoadedFromDisk(uint64_t size) const
{
if (mgmtObject) {
- mgmtObject->inc_msgFtdDequeues();
- mgmtObject->inc_byteFtdDequeues(size);
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qStats->msgFtdDequeues += 1;
+ qStats->byteFtdDequeues += size;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgFtdDequeues();
- brokerMgmtObject->inc_byteFtdDequeues(size);
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgFtdDequeues += 1;
+ bStats->byteFtdDequeues += size;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -1434,9 +1625,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
{
_qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
boost::shared_ptr<Exchange> dest;
- if (rerouteArgs.i_useAltExchange)
+ if (rerouteArgs.i_useAltExchange) {
+ if (!alternateExchange) {
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ etext = "No alternate-exchange defined";
+ break;
+ }
dest = alternateExchange;
- else {
+ } else {
try {
dest = broker->getExchanges().get(rerouteArgs.i_exchange);
} catch(const std::exception&) {
@@ -1486,8 +1682,12 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
<< "\": exchange does not exist.");
}
//process any pending dequeues
- for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
- pendingDequeues.clear();
+ std::deque<QueuedMessage> pd;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ pendingDequeues.swap(pd);
+ }
+ for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
void Queue::insertSequenceNumbers(const std::string& key)
@@ -1497,10 +1697,10 @@ void Queue::insertSequenceNumbers(const std::string& key)
QPID_LOG(debug, "Inserting sequence numbers as " << key);
}
-/** updates queue observers and state when a message has become available for transfer,
- * expects messageLock to be held
+/** updates queue observers and state when a message has become available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
+void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1509,10 +1709,6 @@ void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
}
- if (policy.get()) {
- policy->enqueued(m);
- }
- mgntEnqStats(m.payload);
}
void Queue::updateEnqueued(const QueuedMessage& m)
@@ -1520,12 +1716,16 @@ void Queue::updateEnqueued(const QueuedMessage& m)
if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
enqueue(0, payload, true);
- messages->updateAcquired(m);
- if (policy.get()) {
- policy->recoverEnqueued(payload);
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->updateAcquired(m);
+ observeEnqueue(m, locker);
+ if (policy.get()) {
+ policy->recoverEnqueued(payload);
+ policy->enqueued(m);
+ }
}
- Mutex::ScopedLock locker(messageLock);
- observeEnqueue(m, locker);
+ mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1533,10 +1733,16 @@ void Queue::updateEnqueued(const QueuedMessage& m)
bool Queue::isEnqueued(const QueuedMessage& msg)
{
+ Mutex::ScopedLock locker(messageLock);
return !policy.get() || policy->isEnqueued(msg);
}
+// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's
+// state is not changed while listeners is referenced.
QueueListeners& Queue::getListeners() { return listeners; }
+
+// Note: accessing messages outside of lock is dangerous. Caller must ensure the queue's
+// state is not changed while messages is referenced.
Messages& Queue::getMessages() { return *messages; }
const Messages& Queue::getMessages() const { return *messages; }
@@ -1549,13 +1755,13 @@ void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock lock(messageLock);
observers.insert(observer);
}
void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock lock(messageLock);
observers.erase(observer);
}
@@ -1618,7 +1824,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
{
- Monitor::ScopedLock l(parent.messageLock);
+ Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */
if (parent.deleted) {
return false;
} else {
@@ -1639,3 +1845,6 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+}}
+
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index e8573c17cc..9869a698c1 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -97,7 +97,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const bool autodelete;
MessageStore* store;
const OwnershipToken* owner;
- uint32_t consumerCount;
+ uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
+ uint32_t browserCount; // Count of non-acquiring subscriptions.
OwnershipToken* exclusive;
bool noLocal;
bool persistLastNode;
@@ -107,7 +108,22 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QueueListeners listeners;
std::auto_ptr<Messages> messages;
std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
- mutable qpid::sys::Mutex consumerLock;
+ /** messageLock is used to keep the Queue's state consistent while processing message
+ * events, such as message dispatch, enqueue, acquire, and dequeue. It must be held
+ * while updating certain members in order to keep these members consistent with
+ * each other:
+ * o messages
+ * o sequence
+ * o policy
+ * o listeners
+ * o allocator
+ * o observeXXX() methods
+ * o observers
+ * o pendingDequeues (TBD: move under separate lock)
+ * o exclusive OwnershipToken (TBD: move under separate lock)
+ * o consumerCount (TBD: move under separate lock)
+ * o Queue::UsageBarrier (TBD: move under separate lock)
+ */
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
@@ -143,52 +159,20 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- /** update queue observers, stats, policy, etc when the messages' state changes. Lock
- * must be held by caller */
+ /** update queue observers, stats, policy, etc when the messages' state changes.
+ * messageLock is held by caller */
void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock);
- // acquire message @ position, return true and set msg if acquire succeeds
- bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
- const sys::Mutex::ScopedLock& held);
+ void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
+ void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
+ bool popAndDequeue(QueuedMessage&);
+ bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
void forcePersistent(QueuedMessage& msg);
int getEventMode();
void configureImpl(const qpid::framing::FieldTable& settings);
-
- inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
- {
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- brokerMgmtObject->inc_msgTotalEnqueues ();
- brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- if (msg->isPersistent ()) {
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- brokerMgmtObject->inc_msgPersistEnqueues ();
- brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
- }
- }
- inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg)
- {
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg->contentSize());
- brokerMgmtObject->inc_msgTotalDequeues ();
- brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize());
- if (msg->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg->contentSize());
- brokerMgmtObject->inc_msgPersistDequeues ();
- brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize());
- }
- }
- }
-
void checkNotDeleted(const Consumer::shared_ptr& c);
void notifyDeleted();
@@ -235,8 +219,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/**
* Bind self to specified exchange, and record that binding for unbinding on delete.
*/
- bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
- const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
+ QPID_BROKER_EXTERN bool bind(
+ boost::shared_ptr<Exchange> exchange, const std::string& key,
+ const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
/** Acquire the message at the given position if it is available for acquire. Not to
* be used by clients, but used by the broker for queue management.
@@ -271,28 +256,29 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
+ QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
const ::qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
- const qpid::types::Variant::Map *filter=0);
+ QPID_BROKER_EXTERN uint32_t move(
+ const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
inline const std::string& getName() const { return name; }
- bool isExclusiveOwner(const OwnershipToken* const o) const;
- void releaseExclusiveOwnership();
- bool setExclusiveOwner(const OwnershipToken* const o);
- bool hasExclusiveConsumer() const;
- bool hasExclusiveOwner() const;
+ QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
+ QPID_BROKER_EXTERN void releaseExclusiveOwnership();
+ QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
+ QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
+ QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
inline bool isDurable() const { return store != 0; }
inline const framing::FieldTable& getSettings() const { return settings; }
inline bool isAutoDelete() const { return autodelete; }
- bool canAutoDelete() const;
+ QPID_BROKER_EXTERN bool canAutoDelete() const;
const QueueBindings& getBindings() const { return bindings; }
/**
@@ -301,8 +287,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN void setLastNodeFailure();
QPID_BROKER_EXTERN void clearLastNodeFailure();
- bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
- void enqueueAborted(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
+ QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
@@ -311,7 +297,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* Inform the queue that a previous transactional dequeue
* committed.
*/
- void dequeueCommitted(const QueuedMessage& msg);
+ QPID_BROKER_EXTERN void dequeueCommitted(const QueuedMessage& msg);
/**
* Inform queue of messages that were enqueued, have since
@@ -319,7 +305,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* thus are still logically on the queue) - used in
* clustered broker.
*/
- void updateEnqueued(const QueuedMessage& msg);
+ QPID_BROKER_EXTERN void updateEnqueued(const QueuedMessage& msg);
/**
* Test whether the specified message (identified by its
@@ -328,7 +314,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* have been delievered to a subscriber who has not yet
* accepted it).
*/
- bool isEnqueued(const QueuedMessage& msg);
+ QPID_BROKER_EXTERN bool isEnqueued(const QueuedMessage& msg);
/**
* Acquires the next available (oldest) message
@@ -338,17 +324,17 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Get the message at position pos, returns true if found and sets msg */
QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
- const QueuePolicy* getPolicy();
+ QPID_BROKER_EXTERN const QueuePolicy* getPolicy();
- void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
- boost::shared_ptr<Exchange> getAlternateExchange();
- bool isLocal(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
+ QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
+ QPID_BROKER_EXTERN bool isLocal(boost::intrusive_ptr<Message>& msg);
//PersistableQueue support:
- uint64_t getPersistenceId() const;
- void setPersistenceId(uint64_t persistenceId) const;
- void encode(framing::Buffer& buffer) const;
- uint32_t encodedSize() const;
+ QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
+ QPID_BROKER_EXTERN void setPersistenceId(uint64_t persistenceId) const;
+ QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const;
+ QPID_BROKER_EXTERN uint32_t encodedSize() const;
/**
* Restores a queue from encoded data (used in recovery)
@@ -362,15 +348,15 @@ class Queue : public boost::enable_shared_from_this<Queue>,
virtual void setExternalQueueStore(ExternalQueueStore* inst);
// Increment the rejected-by-consumer counter.
- void countRejected() const;
- void countFlowedToDisk(uint64_t size) const;
- void countLoadedFromDisk(uint64_t size) const;
+ QPID_BROKER_EXTERN void countRejected() const;
+ QPID_BROKER_EXTERN void countFlowedToDisk(uint64_t size) const;
+ QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
// Manageable entry points
- management::ManagementObject* GetManagementObject (void) const;
+ QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- void query(::qpid::types::Variant::Map&) const;
+ QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+ QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
@@ -385,6 +371,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Apply f to each Observer on the queue */
template <class F> void eachObserver(F f) {
+ sys::Mutex::ScopedLock l(messageLock);
std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
}
@@ -396,31 +383,31 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** return current position sequence number for the next message on the queue.
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
- void addObserver(boost::shared_ptr<QueueObserver>);
- void removeObserver(boost::shared_ptr<QueueObserver>);
+ QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
+ QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
/**
* Notify queue that recovery has completed.
*/
- void recoveryComplete(ExchangeRegistry& exchanges);
+ QPID_BROKER_EXTERN void recoveryComplete(ExchangeRegistry& exchanges);
// For cluster update
- QueueListeners& getListeners();
- Messages& getMessages();
- const Messages& getMessages() const;
+ QPID_BROKER_EXTERN QueueListeners& getListeners();
+ QPID_BROKER_EXTERN Messages& getMessages();
+ QPID_BROKER_EXTERN const Messages& getMessages() const;
/**
* Reserve space in policy for an enqueued message that
* has been recovered in the prepared state (dtx only)
*/
- void recoverPrepared(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void recoverPrepared(boost::intrusive_ptr<Message>& msg);
- void flush();
+ QPID_BROKER_EXTERN void flush();
- Broker* getBroker();
+ QPID_BROKER_EXTERN Broker* getBroker();
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
- void setDequeueSincePurge(uint32_t value);
+ QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
};
}
}
diff --git a/cpp/src/qpid/broker/QueueListeners.cpp b/cpp/src/qpid/broker/QueueListeners.cpp
index 32c208b073..0338a674cf 100644
--- a/cpp/src/qpid/broker/QueueListeners.cpp
+++ b/cpp/src/qpid/broker/QueueListeners.cpp
@@ -79,10 +79,6 @@ void QueueListeners::NotificationSet::notify()
std::for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify));
}
-bool QueueListeners::contains(Consumer::shared_ptr c) const {
- return c->inListeners;
-}
-
void QueueListeners::ListenerSet::notifyAll()
{
std::for_each(listeners.begin(), listeners.end(), boost::mem_fn(&Consumer::notify));
diff --git a/cpp/src/qpid/broker/QueueListeners.h b/cpp/src/qpid/broker/QueueListeners.h
index 0659499253..ca844fd47e 100644
--- a/cpp/src/qpid/broker/QueueListeners.h
+++ b/cpp/src/qpid/broker/QueueListeners.h
@@ -30,7 +30,7 @@ namespace broker {
/**
* Track and notify components that wish to be notified of messages
* that become available on a queue.
- *
+ *
* None of the methods defined here are protected by locking. However
* the populate method allows a 'snapshot' to be taken of the
* listeners to be notified. NotificationSet::notify() may then be
@@ -61,11 +61,10 @@ class QueueListeners
friend class QueueListeners;
};
- void addListener(Consumer::shared_ptr);
- void removeListener(Consumer::shared_ptr);
+ void addListener(Consumer::shared_ptr);
+ void removeListener(Consumer::shared_ptr);
void populate(NotificationSet&);
void snapshot(ListenerSet&);
- bool contains(Consumer::shared_ptr c) const;
void notifyAll();
template <class F> void eachListener(F f) {
diff --git a/cpp/src/qpid/broker/QueuedMessage.cpp b/cpp/src/qpid/broker/QueuedMessage.cpp
new file mode 100644
index 0000000000..d40cc901ff
--- /dev/null
+++ b/cpp/src/qpid/broker/QueuedMessage.cpp
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueuedMessage.h"
+#include "Queue.h"
+#include <iostream>
+
+namespace qpid {
+namespace broker {
+
+std::ostream& operator<<(std::ostream& o, const QueuedMessage& qm) {
+ o << (qm.queue ? qm.queue->getName() : std::string()) << "[" << qm.position <<"]";
+ return o;
+}
+
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h
index 806da8e720..9d008193a0 100644
--- a/cpp/src/qpid/broker/QueuedMessage.h
+++ b/cpp/src/qpid/broker/QueuedMessage.h
@@ -22,6 +22,8 @@
#define _QueuedMessage_
#include "qpid/broker/Message.h"
+#include "BrokerImportExport.h"
+#include <iosfwd>
namespace qpid {
namespace broker {
@@ -47,6 +49,7 @@ inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
return a.position < b.position;
}
+QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueuedMessage&);
}}
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index d7adbd68ab..80fa5e1c0e 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/sys/SecuritySettings.h"
#include <boost/format.hpp>
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index e7d2259c80..64924bdd4c 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -489,14 +489,14 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
exchangeName << " with routing-key " << msg->getRoutingKey()));
}
- cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
+ cacheExchange->route(strategy);
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
//TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
+ cacheExchange->getAlternate()->route(strategy);
}
if (!strategy.delivered) {
msg->destroy();
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 5a83fd0fb3..e5e1d2da16 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Credit.h"
#include "qpid/broker/Deliverable.h"
@@ -39,7 +40,6 @@
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicValue.h"
-#include "qpid/broker/AclModule.h"
#include "qmf/org/apache/qpid/broker/Subscription.h"
#include <list>
@@ -99,42 +99,44 @@ class SemanticState : private boost::noncopyable {
bool haveCredit();
protected:
- virtual bool doDispatch();
+ QPID_BROKER_EXTERN virtual bool doDispatch();
size_t unacked() { return parent->unacked.size(); }
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
- ConsumerImpl(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive,
- const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
- virtual ~ConsumerImpl();
- OwnershipToken* getSession();
- virtual bool deliver(QueuedMessage& msg);
- bool filter(boost::intrusive_ptr<Message> msg);
- bool accept(boost::intrusive_ptr<Message> msg);
- void cancel() {}
-
- void disableNotify();
- void enableNotify();
- void notify();
- bool isNotifyEnabled() const;
-
- void requestDispatch();
-
- void setWindowMode();
- void setCreditMode();
- void addByteCredit(uint32_t value);
- void addMessageCredit(uint32_t value);
- void flush();
- void stop();
- void complete(DeliveryRecord&);
+ QPID_BROKER_EXTERN ConsumerImpl(
+ SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive,
+ const std::string& tag, const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ QPID_BROKER_EXTERN virtual ~ConsumerImpl();
+ QPID_BROKER_EXTERN OwnershipToken* getSession();
+ QPID_BROKER_EXTERN virtual bool deliver(QueuedMessage& msg);
+ QPID_BROKER_EXTERN bool filter(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN bool accept(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void cancel() {}
+
+ QPID_BROKER_EXTERN void disableNotify();
+ QPID_BROKER_EXTERN void enableNotify();
+ QPID_BROKER_EXTERN void notify();
+ QPID_BROKER_EXTERN bool isNotifyEnabled() const;
+
+ QPID_BROKER_EXTERN void requestDispatch();
+
+ QPID_BROKER_EXTERN void setWindowMode();
+ QPID_BROKER_EXTERN void setCreditMode();
+ QPID_BROKER_EXTERN void addByteCredit(uint32_t value);
+ QPID_BROKER_EXTERN void addMessageCredit(uint32_t value);
+ QPID_BROKER_EXTERN void flush();
+ QPID_BROKER_EXTERN void stop();
+ QPID_BROKER_EXTERN void complete(DeliveryRecord&);
boost::shared_ptr<Queue> getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
- bool doOutput();
+ QPID_BROKER_EXTERN bool doOutput();
Credit& getCredit() { return credit; }
const Credit& getCredit() const { return credit; }
@@ -152,8 +154,11 @@ class SemanticState : private boost::noncopyable {
void acknowledged(const broker::QueuedMessage&) {}
// manageable entry points
- management::ManagementObject* GetManagementObject (void) const;
- management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+ QPID_BROKER_EXTERN management::ManagementObject*
+ GetManagementObject(void) const;
+
+ QPID_BROKER_EXTERN management::Manageable::status_t
+ ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
};
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 4aad46f782..78f2e43ce0 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -21,8 +21,9 @@
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
-#include "qpid/log/Statement.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/SessionState.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -73,18 +74,12 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
if(passive){
AclModule* acl = getBroker().getAcl();
if (acl) {
- //TODO: why does a passive declare require create
- //permission? The purpose of the passive flag is to state
- //that the exchange should *not* created. For
- //authorisation a passive declare is similar to
- //exchange-query.
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_TYPE, type));
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
- throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchange,&params) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange access request from " << getConnection().getUserId()));
}
Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
@@ -274,22 +269,16 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
if (passive && !name.empty()) {
AclModule* acl = getBroker().getAcl();
if (acl) {
- //TODO: why does a passive declare require create
- //permission? The purpose of the passive flag is to state
- //that the queue should *not* created. For
- //authorisation a passive declare is similar to
- //queue-query (or indeed a qmf query).
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,&params) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue access request from " << getConnection().getUserId()));
}
queue = getQueue(name);
//TODO: check alternate-exchange is as expected
@@ -409,6 +398,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
if(!destination.empty() && state.exists(destination))
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
+ // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA.
if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0)
throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
<< queue->getName()));
@@ -548,13 +538,6 @@ void SessionAdapter::TxHandlerImpl::rollback()
state.rollback();
}
-std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid)
-{
- std::string encoded;
- encode(xid, encoded);
- return encoded;
-}
-
void SessionAdapter::DtxHandlerImpl::select()
{
state.selectDtx();
@@ -566,7 +549,7 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
{
try {
if (fail) {
- state.endDtx(convert(xid), true);
+ state.endDtx(DtxManager::convert(xid), true);
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
@@ -574,9 +557,9 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
}
} else {
if (suspend) {
- state.suspendDtx(convert(xid));
+ state.suspendDtx(DtxManager::convert(xid));
} else {
- state.endDtx(convert(xid), false);
+ state.endDtx(DtxManager::convert(xid), false);
}
return XaResult(XA_STATUS_XA_OK);
}
@@ -594,9 +577,9 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
}
try {
if (resume) {
- state.resumeDtx(convert(xid));
+ state.resumeDtx(DtxManager::convert(xid));
} else {
- state.startDtx(convert(xid), getBroker().getDtxManager(), join);
+ state.startDtx(DtxManager::convert(xid), getBroker().getDtxManager(), join);
}
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
@@ -607,7 +590,7 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
- bool ok = getBroker().getDtxManager().prepare(convert(xid));
+ bool ok = getBroker().getDtxManager().prepare(DtxManager::convert(xid));
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
@@ -618,7 +601,7 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool onePhase)
{
try {
- bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
+ bool ok = getBroker().getDtxManager().commit(DtxManager::convert(xid), onePhase);
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
@@ -629,7 +612,7 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
- getBroker().getDtxManager().rollback(convert(xid));
+ getBroker().getDtxManager().rollback(DtxManager::convert(xid));
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
@@ -659,7 +642,7 @@ void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
- uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
+ uint32_t timeout = getBroker().getDtxManager().getTimeout(DtxManager::convert(xid));
return DtxGetTimeoutResult(timeout);
}
@@ -667,7 +650,7 @@ DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
uint32_t timeout)
{
- getBroker().getDtxManager().setTimeout(convert(xid), timeout);
+ getBroker().getDtxManager().setTimeout(DtxManager::convert(xid), timeout);
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index 8987c4812f..bc056538b1 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -226,10 +226,8 @@ class Queue;
void rollback();
};
- class DtxHandlerImpl : public DtxHandler, public HandlerHelper, private framing::StructHelper
+ class DtxHandlerImpl : public DtxHandler, public HandlerHelper
{
- std::string convert(const framing::Xid& xid);
-
public:
DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 752fa55535..b58c7c01c5 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -64,6 +64,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
+ if (detachedCallback) detachedCallback();
connection.closeChannel(channel.get());
}
@@ -117,4 +118,8 @@ void SessionHandler::attached(const std::string& name)
}
}
+void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
+ detachedCallback = cb;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 8cd5072574..4e2cfaa963 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,6 +25,7 @@
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include <boost/function.hpp>
namespace qpid {
class SessionState;
@@ -61,7 +62,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
* This proxy is for sending such commands. In a clustered broker it will take steps
* to synchronize command order across the cluster. In a stand-alone broker
* it is just a synonym for getProxy()
- */
+ */
framing::AMQP_ClientProxy& getClusterOrderProxy() {
return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
}
@@ -70,6 +71,8 @@ class SessionHandler : public amqp_0_10::SessionHandler {
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
+ void setDetachedCallback(boost::function<void()> cb);
+
protected:
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
@@ -91,6 +94,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
std::auto_ptr<SetChannelProxy> clusterOrderProxy;
+ boost::function<void ()> detachedCallback;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 644a3d628e..dd3ec13019 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -350,8 +350,9 @@ TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queu
return (q != qv.end()) ? bk : 0;
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+void TopicExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
// Note: PERFORMANCE CRITICAL!!!
BindingList b;
std::map<std::string, BindingList>::iterator it;
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 636918f8a1..cc24e1411e 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -185,9 +185,7 @@ class TopicExchange : public virtual Exchange {
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
diff --git a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
index 2acc09cded..a38e6ac12a 100644
--- a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
@@ -25,6 +25,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/FieldValue.h"
#include <windows.h>