diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
| commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
| tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/broker | |
| parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
| download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
63 files changed, 2107 insertions, 1105 deletions
diff --git a/cpp/src/qpid/broker/AclModule.h b/cpp/src/qpid/broker/AclModule.h index ff9281b6fc..7c180439cf 100644 --- a/cpp/src/qpid/broker/AclModule.h +++ b/cpp/src/qpid/broker/AclModule.h @@ -113,6 +113,7 @@ namespace acl { namespace broker { + class Connection; class AclModule { @@ -139,6 +140,11 @@ namespace broker { // Add specialized authorise() methods as required. + /** Approve connection by counting connections total, per-IP, and + * per-user. + */ + virtual bool approveConnection (const Connection& connection)=0; + virtual ~AclModule() {}; }; } // namespace broker diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 5b531e4636..d1706b5907 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -57,22 +57,25 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) conn->received(frame); } -Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args, - InitializeCallback init) : - link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), - initialize(init), detached(false) +Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, + CancellationListener l, const _qmf::ArgsLinkBridge& _args, + InitializeCallback init, const std::string& _queueName, const string& ae) : + link(_link), channel(_id), args(_args), mgmtObject(0), + listener(l), name(_name), + queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() + : _queueName), + altEx(ae), persistenceId(0), + connState(0), conn(0), initialize(init), detached(false), + useExistingQueue(!_queueName.empty()), + sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag()) { - std::stringstream title; - title << id << "_" << name; - queueName += title.str(); ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge - (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, + (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); + mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); @@ -90,23 +93,22 @@ void Bridge::create(Connection& c) conn = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); - SessionHandler& sessionHandler = c.getChannel(id); - sessionHandler.setDetachedCallback( - boost::bind(&Bridge::sessionDetached, shared_from_this())); + SessionHandler& sessionHandler = c.getChannel(channel); + sessionHandler.setErrorListener(shared_from_this()); if (args.i_srcIsLocal) { if (args.i_dynamic) throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler pushHandler.reset(new PushHandler(&c)); - channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get())); session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - session->attach(name, false); + session->attach(sessionName, false); session->commandPoint(0,0); } else { - sessionHandler.attachAs(name); + sessionHandler.attachAs(sessionName); // Point the bridging commands at the remote peer broker peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } @@ -115,7 +117,7 @@ void Bridge::create(Connection& c) if (initialize) initialize(*this, sessionHandler); else if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); } else { @@ -138,12 +140,13 @@ void Bridge::create(Connection& c) } bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? - bool autoDelete = !durable;//auto delete transient queues? - peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings); + bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge + bool autoDelete = exclusive && !durable;//auto delete transient queues? + peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); if (!args.i_dynamic) peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); - peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options); + peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); if (args.i_dynamic) { @@ -163,11 +166,12 @@ void Bridge::cancel(Connection&) { if (resetProxy()) { peer->getMessage().cancel(args.i_dest); - peer->getSession().detach(name); + peer->getSession().detach(sessionName); } QPID_LOG(debug, "Cancelled bridge " << name); } +/** Notify the bridge that the connection has closed */ void Bridge::closed() { if (args.i_dynamic) { @@ -177,9 +181,10 @@ void Bridge::closed() QPID_LOG(debug, "Closed bridge " << name); } -void Bridge::destroy() +/** Shut down the bridge */ +void Bridge::close() { - listener(this); + listener(this); // ask the LinkRegistry to destroy us } void Bridge::setPersistenceId(uint64_t pId) const @@ -187,8 +192,21 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } + +const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2"); +const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge"); + +bool Bridge::isEncodedBridge(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; +} + + Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string src; @@ -196,9 +214,33 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) string key; string id; string excludes; + string name; + + Link::shared_ptr link; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the bridge by host:port, not by name, and + * transport wasn't provided. Try to find a link using those paramters. + */ + buffer.getShortString(host); + port = buffer.getShort(); + + link = links.getLink(host, port); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port); + return Bridge::shared_ptr(); + } + } else { + string linkName; + + buffer.getShortString(name); + buffer.getShortString(linkName); + link = links.getLink(linkName); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'"); + return Bridge::shared_ptr(); + } + } - buffer.getShortString(host); - port = buffer.getShort(); bool durable(buffer.getOctet()); buffer.getShortString(src); buffer.getShortString(dest); @@ -210,15 +252,21 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); - return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes, dynamic, sync).first; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions did not provide a name for the bridge, so create one + */ + name = createName(link->getName(), src, dest, key); + } + + return links.declare(name, *link, durable, src, dest, key, is_queue, + is_local, id, excludes, dynamic, sync).first; } void Bridge::encode(Buffer& buffer) const { - buffer.putShortString(string("bridge")); - buffer.putShortString(link->getHost()); - buffer.putShort(link->getPort()); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); + buffer.putShortString(link->getName()); buffer.putOctet(args.i_durable ? 1 : 0); buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); @@ -233,9 +281,9 @@ void Bridge::encode(Buffer& buffer) const uint32_t Bridge::encodedSize() const { - return link->getHost().size() + 1 // short-string (host) - + 7 // short-string ("bridge") - + 2 // port + return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + + name.size() + 1 + + link->getName().size() + 1 + 1 // durable + args.i_src.size() + 1 + args.i_dest.size() + 1 @@ -259,7 +307,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, { if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed - destroy(); + QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'"); + close(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; @@ -306,7 +355,7 @@ void Bridge::sendReorigin() } bool Bridge::resetProxy() { - SessionHandler& sessionHandler = conn->getChannel(id); + SessionHandler& sessionHandler = conn->getChannel(channel); if (!sessionHandler.getSession()) peer.reset(); else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); return peer.get(); @@ -318,7 +367,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang peer->getExchange().bind(queue, exchange, key, args); } else { QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge"); - destroy(); + close(); } } @@ -333,8 +382,38 @@ const string& Bridge::getLocalTag() const return link->getBroker()->getFederationTag(); } -void Bridge::sessionDetached() { +// SessionHandler::ErrorListener methods. +void Bridge::connectionException( + framing::connection::CloseCode code, const std::string& msg) +{ + if (errorListener) errorListener->connectionException(code, msg); +} + +void Bridge::channelException( + framing::session::DetachCode code, const std::string& msg) +{ + if (errorListener) errorListener->channelException(code, msg); +} + +void Bridge::executionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (errorListener) errorListener->executionException(code, msg); +} + +void Bridge::detach() { detached = true; + if (errorListener) errorListener->detach(); +} + +std::string Bridge::createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + std::stringstream keystream; + keystream << linkName << "!" << src << "!" << dest << "!" << key; + return keystream.str(); } }} diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index 32b9fd1781..ee298afd45 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -29,6 +29,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/SessionHandler.h" #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h" #include "qmf/org/apache/qpid/broker/Bridge.h" @@ -43,29 +44,31 @@ class Connection; class ConnectionState; class Link; class LinkRegistry; -class SessionHandler; class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge, + public SessionHandler::ErrorListener, public boost::enable_shared_from_this<Bridge> { -public: + public: typedef boost::shared_ptr<Bridge> shared_ptr; typedef boost::function<void(Bridge*)> CancellationListener; typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback; - Bridge(Link* link, framing::ChannelId id, CancellationListener l, + Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l, const qmf::org::apache::qpid::broker::ArgsLinkBridge& args, - InitializeCallback init + InitializeCallback init, const std::string& queueName="", + const std::string& altExchange="" ); ~Bridge(); - void create(Connection& c); - void cancel(Connection& c); - void closed(); - void destroy(); + QPID_BROKER_EXTERN void close(); bool isDurable() { return args.i_durable; } + Link *getLink() const { return link; } + const std::string getSrc() const { return args.i_src; } + const std::string getDest() const { return args.i_dest; } + const std::string getKey() const { return args.i_key; } bool isDetached() const { return detached; } @@ -80,7 +83,11 @@ public: uint32_t encodedSize() const; void encode(framing::Buffer& buffer) const; const std::string& getName() const { return name; } + + static const std::string ENCODED_IDENTIFIER; + static const std::string ENCODED_IDENTIFIER_V1; static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + static bool isEncodedBridge(const std::string& key); // Exchange::DynamicBridge methods void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0); @@ -93,10 +100,20 @@ public: std::string getQueueName() const { return queueName; } const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; } -private: - // Callback when the bridge's session is detached. - void sessionDetached(); + /** create a name for a bridge (if none supplied by user config) */ + static std::string createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key); + + // SessionHandler::ErrorListener methods. + void connectionException(framing::connection::CloseCode code, const std::string& msg); + void channelException(framing::session::DetachCode, const std::string& msg); + void executionException(framing::execution::ErrorCode, const std::string& msg); + void detach(); + void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; } + private: struct PushHandler : framing::FrameHandler { PushHandler(Connection* c) { conn = c; } void handle(framing::AMQFrame& frame); @@ -108,19 +125,30 @@ private: std::auto_ptr<framing::AMQP_ServerProxy::Session> session; std::auto_ptr<framing::AMQP_ServerProxy> peer; - Link* link; - framing::ChannelId id; + Link* const link; + const framing::ChannelId channel; qmf::org::apache::qpid::broker::ArgsLinkBridge args; qmf::org::apache::qpid::broker::Bridge* mgmtObject; CancellationListener listener; std::string name; std::string queueName; + std::string altEx; mutable uint64_t persistenceId; ConnectionState* connState; Connection* conn; InitializeCallback initialize; bool detached; // Set when session is detached. bool resetProxy(); + + // connection Management (called by owning Link) + void create(Connection& c); + void cancel(Connection& c); + void closed(); + friend class Link; // to call create, cancel, closed() + boost::shared_ptr<ErrorListener> errorListener; + + const bool useExistingQueue; + const std::string sessionName; }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index f20cce18a2..b763dd4119 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -108,7 +108,6 @@ Broker::Options::Options(const std::string& name) : noDataDir(0), port(DEFAULT_PORT), workerThreads(5), - maxConnections(500), connectionBacklog(10), enableMgmt(1), mgmtPublish(1), @@ -128,8 +127,10 @@ Broker::Options::Options(const std::string& name) : queueFlowResumeRatio(70), queueThresholdEventRatio(80), defaultMsgGroup("qpid.no-group"), - timestampRcvMsgs(false), // set the 0.10 timestamp delivery property - linkMaintenanceInterval(2) + timestampRcvMsgs(false), // set the 0.10 timestamp delivery property + linkMaintenanceInterval(2), + linkHeartbeatInterval(120), + maxNegotiateTime(2000) // 2s { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -146,7 +147,6 @@ Broker::Options::Options(const std::string& name) : ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") - ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)") @@ -171,6 +171,9 @@ Broker::Options::Options(const std::string& name) : ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.") ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS")) + ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS")) + ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation") + ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag") ; } @@ -208,7 +211,6 @@ Broker::Broker(const Broker::Options& conf) : inCluster(false), clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), - connectionCounter(conf.maxConnections), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) { @@ -227,7 +229,6 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); - mgmtObject->set_maxConns(conf.maxConnections); mgmtObject->set_connBacklog(conf.connectionBacklog); mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); mgmtObject->set_mgmtPublish(conf.mgmtPublish); @@ -244,8 +245,11 @@ Broker::Broker(const Broker::Options& conf) : // management schema correct. Vhost* vhost = new Vhost(this, this); vhostObject = Vhost::shared_ptr(vhost); - framing::Uuid uuid(managementAgent->getUuid()); - federationTag = uuid.str(); + if (conf.fedTag.empty()) { + framing::Uuid uuid(managementAgent->getUuid()); + federationTag = uuid.str(); + } else + federationTag = conf.fedTag; vhostObject->setFederationTag(federationTag); queues.setParent(vhost); @@ -254,8 +258,11 @@ Broker::Broker(const Broker::Options& conf) : } else { // Management is disabled so there is no broker management ID. // Create a unique uuid to use as the federation tag. - framing::Uuid uuid(true); - federationTag = uuid.str(); + if (conf.fedTag.empty()) { + framing::Uuid uuid(true); + federationTag = uuid.str(); + } else + federationTag = conf.fedTag; } QueuePolicy::setDefaultMaxSize(conf.queueLimit); @@ -346,7 +353,7 @@ Broker::Broker(const Broker::Options& conf) : knownBrokers.push_back(Url(conf.knownHosts)); } - } catch (const std::exception& /*e*/) { + } catch (const std::exception&) { finalize(); throw; } @@ -443,7 +450,7 @@ Manageable* Broker::GetVhostObject(void) const Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& args, - string&) + string& text) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -458,6 +465,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CONNECT : { + /** Management is creating a Link to a remote broker using the host and port of + * the remote. This (old) interface does not allow management to specify a name + * for the link, nor does it allow multiple Links to the same remote. Use the + * "create()" broker method if these features are needed. + * TBD: deprecate this interface. + */ + QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='link' instead."); _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); @@ -466,13 +481,24 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); + text = "transport type not supported"; return Manageable::STATUS_NOT_IMPLEMENTED; } - std::pair<Link::shared_ptr, bool> response = - links.declare (hp.i_host, hp.i_port, transport, hp.i_durable, - hp.i_authMechanism, hp.i_username, hp.i_password); - if (hp.i_durable && response.second) - store->create(*response.first); + + // Does a link to the remote already exist? If so, re-use the existing link + // - this behavior is backward compatible with previous releases. + if (!links.getLink(hp.i_host, hp.i_port, transport)) { + // new link, need to generate a unique name for it + std::pair<Link::shared_ptr, bool> response = + links.declare(Link::createName(transport, hp.i_host, hp.i_port), + hp.i_host, hp.i_port, transport, + hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password); + if (!response.first) { + text = "Unable to create Link"; + status = Manageable::STATUS_PARAMETER_INVALID; + break; + } + } status = Manageable::STATUS_OK; break; } @@ -543,6 +569,8 @@ const std::string TYPE_QUEUE("queue"); const std::string TYPE_EXCHANGE("exchange"); const std::string TYPE_TOPIC("topic"); const std::string TYPE_BINDING("binding"); +const std::string TYPE_LINK("link"); +const std::string TYPE_BRIDGE("bridge"); const std::string DURABLE("durable"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); @@ -554,6 +582,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); const std::string _TRUE("true"); const std::string _FALSE("false"); + +// parameters for creating a Link object, see mgmt schema +const std::string HOST("host"); +const std::string PORT("port"); +const std::string TRANSPORT("transport"); +const std::string AUTH_MECHANISM("authMechanism"); +const std::string USERNAME("username"); +const std::string PASSWORD("password"); + +// parameters for creating a Bridge object, see mgmt schema +const std::string LINK("link"); +const std::string SRC("src"); +const std::string DEST("dest"); +const std::string KEY("key"); +const std::string TAG("tag"); +const std::string EXCLUDES("excludes"); +const std::string SRC_IS_QUEUE("srcIsQueue"); +const std::string SRC_IS_LOCAL("srcIsLocal"); +const std::string DYNAMIC("dynamic"); +const std::string SYNC("sync"); } struct InvalidBindingIdentifier : public qpid::Exception @@ -603,6 +651,25 @@ struct UnknownObjectType : public qpid::Exception std::string getPrefix() const { return "unknown object type"; } }; +struct ReservedObjectName : public qpid::Exception +{ + ReservedObjectName(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return std::string("names prefixed with '") + + QPID_NAME_PREFIX + std::string("' are reserved"); } +}; + +struct UnsupportedTransport : public qpid::Exception +{ + UnsupportedTransport(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "transport is not supported"; } +}; + +struct InvalidParameter : public qpid::Exception +{ + InvalidParameter(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "invalid parameter to method call"; } +}; + void Broker::createObject(const std::string& type, const std::string& name, const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) { @@ -674,6 +741,113 @@ void Broker::createObject(const std::string& type, const std::string& name, amqp_0_10::translate(extensions, arguments); bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); + + } else if (type == TYPE_LINK) { + + QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string host; + uint16_t port = 0; + std::string transport = TCP_TRANSPORT; + bool durable = false; + std::string authMech, username, password; + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == HOST) host = i->second.asString(); + else if (i->first == PORT) port = i->second.asUint16(); + else if (i->first == TRANSPORT) transport = i->second.asString(); + else if (i->first == DURABLE) durable = bool(i->second); + else if (i->first == AUTH_MECHANISM) authMech = i->second.asString(); + else if (i->first == USERNAME) username = i->second.asString(); + else if (i->first == PASSWORD) password = i->second.asString(); + else { + // TODO: strict checking here + } + } + + if (!getProtocolFactory(transport)) { + QPID_LOG(error, "Transport '" << transport << "' not supported."); + throw UnsupportedTransport(transport); + } + + std::pair<boost::shared_ptr<Link>, bool> rc; + rc = links.declare(name, host, port, transport, durable, authMech, username, password); + if (!rc.first) { + QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port << + "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\""); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } + + } else if (type == TYPE_BRIDGE) { + + QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string linkName; + std::string src; + std::string dest; + std::string key; + std::string id; + std::string excludes; + std::string queueName; + bool durable = false; + bool srcIsQueue = false; + bool srcIsLocal = false; + bool dynamic = false; + uint16_t sync = 0; + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + + if (i->first == LINK) linkName = i->second.asString(); + else if (i->first == SRC) src = i->second.asString(); + else if (i->first == DEST) dest = i->second.asString(); + else if (i->first == KEY) key = i->second.asString(); + else if (i->first == TAG) id = i->second.asString(); + else if (i->first == EXCLUDES) excludes = i->second.asString(); + else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second); + else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second); + else if (i->first == DYNAMIC) dynamic = bool(i->second); + else if (i->first == SYNC) sync = i->second.asUint16(); + else if (i->first == DURABLE) durable = bool(i->second); + else if (i->first == QUEUE_NAME) queueName = i->second.asString(); + else { + // TODO: strict checking here + } + } + + boost::shared_ptr<Link> link; + if (linkName.empty() || !(link = links.getLink(linkName))) { + QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed."); + throw InvalidParameter(name); + } + std::pair<Bridge::shared_ptr, bool> rc = + links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, + dynamic, sync, + 0, + queueName); + + if (!rc.first) { + QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName << + "; src=" << src << "; dest=" << dest << "; key=" << key); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } } else { throw UnknownObjectType(type); } @@ -696,6 +870,16 @@ void Broker::deleteObject(const std::string& type, const std::string& name, } else if (type == TYPE_BINDING) { BindingIdentifier binding(name); unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); + } else if (type == TYPE_LINK) { + boost::shared_ptr<Link> link = links.getLink(name); + if (link) { + link->close(); + } + } else if (type == TYPE_BRIDGE) { + boost::shared_ptr<Bridge> bridge = links.getBridge(name); + if (bridge) { + bridge->close(); + } } else { throw UnknownObjectType(type); } @@ -920,6 +1104,13 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( ManagementAgent::toMap(arguments), "created")); } + QPID_LOG_CAT(debug, model, "Create queue. name:" << name + << " user:" << userId + << " rhost:" << connectionId + << " durable:" << (durable ? "T" : "F") + << " owner:" << owner + << " autodelete:" << (autodelete ? "T" : "F") + << " alternateExchange:" << alternateExchange ); } return result; } @@ -942,6 +1133,10 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, if (managementAgent.get()) managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name)); + QPID_LOG_CAT(debug, model, "Delete queue. name:" << name + << " user:" << userId + << " rhost:" << connectionId + ); } @@ -993,6 +1188,12 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( ManagementAgent::toMap(arguments), "created")); } + QPID_LOG_CAT(debug, model, "Create exchange. name:" << name + << " user:" << userId + << " rhost:" << connectionId + << " type:" << type + << " alternateExchange:" << alternateExchange + << " durable:" << (durable ? "T" : "F")); } return result; } @@ -1017,7 +1218,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, if (managementAgent.get()) managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); - + QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name + << " user:" << userId + << " rhost:" << connectionId); } void Broker::bind(const std::string& queueName, @@ -1047,10 +1250,16 @@ void Broker::bind(const std::string& queueName, throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); } else { if (queue->bind(exchange, key, arguments)) { + getConfigurationObservers().bind(exchange, queue, key, arguments); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, queueName, key, ManagementAgent::toMap(arguments))); } + QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName + << " queue:" << queueName + << " key:" << key + << " user:" << userId + << " rhost:" << connectionId); } } } @@ -1082,12 +1291,33 @@ void Broker::unbind(const std::string& queueName, if (exchange->isDurable() && queue->isDurable()) { store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); } + getConfigurationObservers().unbind( + exchange, queue, key, framing::FieldTable()); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); } + QPID_LOG_CAT(debug, model, "Delete binding. exchange:" << exchangeName + << " queue:" << queueName + << " key:" << key + << " user:" << userId + << " rhost:" << connectionId); } } } +// FIXME aconway 2012-04-27: access to linkClientProperties is +// not properly thread safe, you could lose fields if 2 threads +// attempt to add a field concurrently. + +framing::FieldTable Broker::getLinkClientProperties() const { + sys::Mutex::ScopedLock l(linkClientPropertiesLock); + return linkClientProperties; +} + +void Broker::setLinkClientProperties(const framing::FieldTable& ft) { + sys::Mutex::ScopedLock l(linkClientPropertiesLock); + linkClientProperties = ft; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 135b9340f9..922d0558e5 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -40,6 +40,7 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" +#include "qpid/broker/ConfigurationObservers.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -64,8 +65,8 @@ namespace qpid { namespace sys { - class ProtocolFactory; - class Poller; +class ProtocolFactory; +class Poller; } struct Url; @@ -91,7 +92,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable, public RefCounted { -public: + public: struct Options : public qpid::Options { static const std::string DEFAULT_DATA_DIR_LOCATION; @@ -103,7 +104,6 @@ public: std::string dataDir; uint16_t port; int workerThreads; - int maxConnections; int connectionBacklog; bool enableMgmt; bool mgmtPublish; @@ -127,31 +127,14 @@ public: std::string defaultMsgGroup; bool timestampRcvMsgs; double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values. + uint16_t linkHeartbeatInterval; + uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation + std::string fedTag; private: std::string getHome(); }; - class ConnectionCounter { - int maxConnections; - int connectionCount; - sys::Mutex connectionCountLock; - public: - ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {}; - void inc_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount++; - } - void dec_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount--; - } - bool allowConnection() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - return (maxConnections <= connectionCount); - } - }; - private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; @@ -183,6 +166,7 @@ public: AclModule* acl; DataDir dataDir; ConnectionObservers connectionObservers; + ConfigurationObservers configurationObservers; QueueRegistry queues; ExchangeRegistry exchanges; @@ -203,9 +187,11 @@ public: bool recovery; bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - ConnectionCounter connectionCounter; ConsumerFactories consumerFactories; + mutable sys::Mutex linkClientPropertiesLock; + framing::FieldTable linkClientProperties; + public: QPID_BROKER_EXTERN virtual ~Broker(); @@ -317,8 +303,6 @@ public: management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } - ConnectionCounter& getConnectionCounter() {return connectionCounter;} - /** * Never true in a stand-alone broker. In a cluster, return true * to defer delivery of messages deliveredg in a cluster-unsafe @@ -377,6 +361,14 @@ public: ConsumerFactories& getConsumerFactories() { return consumerFactories; } ConnectionObservers& getConnectionObservers() { return connectionObservers; } + ConfigurationObservers& getConfigurationObservers() { return configurationObservers; } + + /** Properties to be set on outgoing link connections */ + QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const; + QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&); + + /** Information identifying this system */ + boost::shared_ptr<const System> getSystem() const { return systemObject; } }; }} diff --git a/cpp/src/qpid/broker/ConfigurationObserver.h b/cpp/src/qpid/broker/ConfigurationObserver.h new file mode 100644 index 0000000000..701043db40 --- /dev/null +++ b/cpp/src/qpid/broker/ConfigurationObserver.h @@ -0,0 +1,61 @@ +#ifndef QPID_BROKER_CONFIGURATIONOBSERVER_H +#define QPID_BROKER_CONFIGURATIONOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> +#include <string> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { +class Queue; +class Exchange; + + +/** + * Observer for changes to configuration (aka wiring) + */ +class ConfigurationObserver +{ + public: + virtual ~ConfigurationObserver() {} + virtual void queueCreate(const boost::shared_ptr<Queue>&) {} + virtual void queueDestroy(const boost::shared_ptr<Queue>&) {} + virtual void exchangeCreate(const boost::shared_ptr<Exchange>&) {} + virtual void exchangeDestroy(const boost::shared_ptr<Exchange>&) {} + virtual void bind(const boost::shared_ptr<Exchange>& , + const boost::shared_ptr<Queue>& , + const std::string& /*key*/, + const framing::FieldTable& /*args*/) {} + virtual void unbind(const boost::shared_ptr<Exchange>&, + const boost::shared_ptr<Queue>& , + const std::string& /*key*/, + const framing::FieldTable& /*args*/) {} +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONFIGURATIONOBSERVER_H*/ diff --git a/cpp/src/qpid/broker/ConfigurationObservers.h b/cpp/src/qpid/broker/ConfigurationObservers.h new file mode 100644 index 0000000000..4c1159747d --- /dev/null +++ b/cpp/src/qpid/broker/ConfigurationObservers.h @@ -0,0 +1,72 @@ +#ifndef QPID_BROKER_CONFIGURATIONOBSERVERS_H +#define QPID_BROKER_CONFIGURATIONOBSERVERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConfigurationObserver.h" +#include "Observers.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace broker { + +/** + * A configuration observer that delegates to a collection of + * configuration observers. + * + * THREAD SAFE + */ +class ConfigurationObservers : public ConfigurationObserver, + public Observers<ConfigurationObserver> +{ + public: + void queueCreate(const boost::shared_ptr<Queue>& q) { + each(boost::bind(&ConfigurationObserver::queueCreate, _1, q)); + } + void queueDestroy(const boost::shared_ptr<Queue>& q) { + each(boost::bind(&ConfigurationObserver::queueDestroy, _1, q)); + } + void exchangeCreate(const boost::shared_ptr<Exchange>& e) { + each(boost::bind(&ConfigurationObserver::exchangeCreate, _1, e)); + } + void exchangeDestroy(const boost::shared_ptr<Exchange>& e) { + each(boost::bind(&ConfigurationObserver::exchangeDestroy, _1, e)); + } + void bind(const boost::shared_ptr<Exchange>& exchange, + const boost::shared_ptr<Queue>& queue, + const std::string& key, + const framing::FieldTable& args) { + each(boost::bind( + &ConfigurationObserver::bind, _1, exchange, queue, key, args)); + } + void unbind(const boost::shared_ptr<Exchange>& exchange, + const boost::shared_ptr<Queue>& queue, + const std::string& key, + const framing::FieldTable& args) { + each(boost::bind( + &ConfigurationObserver::unbind, _1, exchange, queue, key, args)); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONFIGURATIONOBSERVERS_H*/ diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 5e339cec03..8d250a32e5 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -43,7 +43,7 @@ #include <iostream> #include <assert.h> - +using std::string; using namespace qpid::sys; using namespace qpid::framing; @@ -87,10 +87,14 @@ Connection::Connection(ConnectionOutputHandler* out_, bool link_, uint64_t objectId_, bool shadow_, - bool delayManagement) : + bool delayManagement, + bool authenticated_ +) : ConnectionState(out_, broker_), securitySettings(external), - adapter(*this, link_, shadow_), + shadow(shadow_), + authenticated(authenticated_), + adapter(*this, link_), link(link_), mgmtClosing(false), mgmtId(mgmtId_), @@ -100,14 +104,12 @@ Connection::Connection(ConnectionOutputHandler* out_, timer(broker_.getTimer()), errorListener(0), objectId(objectId_), - shadow(shadow_), outboundTracker(*this) { outboundTracker.wrap(out); broker.getConnectionObservers().connection(*this); // In a cluster, allow adding the management object to be delayed. if (!delayManagement) addManagementObject(); - if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::addManagementObject() { @@ -141,6 +143,8 @@ Connection::~Connection() // a cluster-unsafe context. Don't raise an event in that case. if (!link && isClusterSafe()) agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId())); + QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId() + << " rhost:" << mgmtId ); } broker.getConnectionObservers().closed(*this); @@ -148,8 +152,9 @@ Connection::~Connection() heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); - - if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } } void Connection::received(framing::AMQFrame& frame) { @@ -284,6 +289,10 @@ void Connection::raiseConnectEvent() { mgmtObject->set_authIdentity(userId); agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId)); } + + QPID_LOG_CAT(debug, model, "Create connection. user:" << userId + << " rhost:" << mgmtId ); + } void Connection::setUserProxyAuth(bool b) @@ -300,6 +309,9 @@ void Connection::close(connection::CloseCode code, const string& text) heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -313,6 +325,9 @@ void Connection::sendClose() { heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -326,6 +341,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions. heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); @@ -435,6 +453,31 @@ struct ConnectionHeartbeatTask : public sys::TimerTask { } }; +class LinkHeartbeatTask : public qpid::sys::TimerTask { + sys::Timer& timer; + Connection& connection; + bool heartbeatSeen; + + void fire() { + if (!heartbeatSeen) { + QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection"); + connection.abort(); + } else { + heartbeatSeen = false; + // Setup next firing + setupNextFire(); + timer.add(this); + } + } + +public: + LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) : + TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {} + + void heartbeatReceived() { heartbeatSeen = true; } +}; + + void Connection::abort() { // Make sure that we don't try to send a heartbeat as we're @@ -460,10 +503,21 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat) } } +void Connection::startLinkHeartbeatTimeoutTask() { + if (!linkHeartbeatTimer && heartbeat > 0) { + linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this); + timer.add(linkHeartbeatTimer); + } +} + void Connection::restartTimeout() { if (timeoutTimer) timeoutTimer->touch(); + + if (linkHeartbeatTimer) { + static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived(); + } } bool Connection::isOpen() { return adapter.isOpen(); } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 1b8bd83139..d01599ce54 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -27,8 +27,7 @@ #include <vector> #include <queue> -#include <boost/ptr_container/ptr_map.hpp> - +#include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/ConnectionHandler.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/SessionHandler.h" @@ -86,15 +85,22 @@ class Connection : public sys::ConnectionInputHandler, bool isLink = false, uint64_t objectId = 0, bool shadow=false, - bool delayManagement = false); + bool delayManagement = false, + bool authenticated=true); ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ SessionHandler& getChannel(framing::ChannelId channel); - /** Close the connection */ - void close(framing::connection::CloseCode code, const std::string& text); + /** Close the connection. Waits for the client to respond with close-ok + * before actually destroying the connection. + */ + QPID_BROKER_EXTERN void close( + framing::connection::CloseCode code, const std::string& text); + + /** Abort the connection. Close abruptly and immediately. */ + QPID_BROKER_EXTERN void abort(); // ConnectionInputHandler methods void received(framing::AMQFrame& frame); @@ -138,8 +144,7 @@ class Connection : public sys::ConnectionInputHandler, void setHeartbeatInterval(uint16_t heartbeat); void sendHeartbeat(); void restartTimeout(); - void abort(); - + template <class F> void eachSessionHandler(F f) { for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) f(*ptr_map_ptr(i)); @@ -149,7 +154,10 @@ class Connection : public sys::ConnectionInputHandler, void setSecureConnection(SecureConnection* secured); /** True if this is a shadow connection in a cluster. */ - bool isShadow() { return shadow; } + bool isShadow() const { return shadow; } + + /** True if this connection is authenticated */ + bool isAuthenticated() const { return authenticated; } // Used by cluster to update connection status sys::AggregateOutput& getOutputTasks() { return outputTasks; } @@ -166,6 +174,7 @@ class Connection : public sys::ConnectionInputHandler, bool isOpen(); bool isLink() { return link; } + void startLinkHeartbeatTimeoutTask(); // Used by cluster during catch-up, see cluster::OutputInterceptor void doIoCallbacks(); @@ -179,6 +188,8 @@ class Connection : public sys::ConnectionInputHandler, ChannelMap channels; qpid::sys::SecuritySettings securitySettings; + bool shadow; + bool authenticated; ConnectionHandler adapter; const bool link; bool mgmtClosing; @@ -189,11 +200,10 @@ class Connection : public sys::ConnectionInputHandler, LinkRegistry& links; management::ManagementAgent* agent; sys::Timer& timer; - boost::intrusive_ptr<sys::TimerTask> heartbeatTimer; + boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer; boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; ErrorListener* errorListener; uint64_t objectId; - bool shadow; framing::FieldTable clientProperties; /** diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index 9e0020812b..d5d24ca629 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,11 +40,6 @@ ConnectionFactory::~ConnectionFactory() {} sys::ConnectionCodec* ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) { ConnectionPtr c(new amqp_0_10::Connection(out, id, false)); c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false))); @@ -62,5 +57,5 @@ ConnectionFactory::create(sys::OutputControl& out, const std::string& id, return c.release(); } - + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 6894324117..06f442a47f 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -36,6 +36,9 @@ using namespace qpid; using namespace qpid::broker; + +using std::string; + using namespace qpid::framing; using qpid::sys::SecurityLayer; namespace _qmf = qmf::org::apache::qpid::broker; @@ -103,9 +106,10 @@ void ConnectionHandler::setSecureConnection(SecureConnection* secured) handler->secured = secured; } -ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool isShadow) : handler(new Handler(connection, isClient, isShadow)) {} +ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : + handler(new Handler(connection, isClient)) {} -ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) : +ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : proxy(c.getOutput()), connection(c), serverMode(!isClient), secured(0), isOpen(false) @@ -116,14 +120,13 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); - authenticator = SaslAuthenticator::createAuthenticator(c, isShadow); + authenticator = SaslAuthenticator::createAuthenticator(c); authenticator->getMechanisms(mechanisms); Array locales(0x95); boost::shared_ptr<FieldValue> l(new Str16Value(en_US)); locales.add(l); proxy.start(properties, mechanisms, locales); - } maxFrameSize = (64 * 1024) - 1; @@ -149,12 +152,20 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0); } catch (std::exception& /*e*/) { management::ManagementAgent* agent = connection.getAgent(); - if (agent) { + bool logEnabled; + QPID_LOG_TEST_CAT(debug, model, logEnabled); + if (logEnabled || agent) + { string error; string uid; authenticator->getError(error); authenticator->getUid(uid); - agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error)); + if (agent) { + agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error)); + } + QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId() + << " user:" << uid + << " reason:" << error ); } throw; } @@ -169,7 +180,9 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) AclModule* acl = connection.getBroker().getAcl(); FieldTable properties; if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){ - proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); + proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, + QPID_MSG("ACL denied " << connection.getUserId() + << " creating a federation link")); return; } QPID_LOG(info, "Connection is a federation link"); @@ -195,12 +208,20 @@ void ConnectionHandler::Handler::secureOk(const string& response) authenticator->step(response); } catch (std::exception& /*e*/) { management::ManagementAgent* agent = connection.getAgent(); - if (agent) { + bool logEnabled; + QPID_LOG_TEST_CAT(debug, model, logEnabled); + if (logEnabled || agent) + { string error; string uid; authenticator->getError(error); authenticator->getUid(uid); - agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error)); + if (agent) { + agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error)); + } + QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId() + << " user:" << uid + << " reason:" << error ); } throw; } @@ -278,7 +299,7 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, service, host, 0, // TODO -- mgoulish Fri Sep 24 2010 - 256, + 256, false ); // disallow interaction } std::string supportedMechanismsList; @@ -318,7 +339,7 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG)); } - FieldTable ft; + FieldTable ft = connection.getBroker().getLinkClientProperties(); ft.setInt(QPID_FED_LINK,1); ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); @@ -367,8 +388,14 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); connection.setFrameMax(maxFrameSize); - connection.setHeartbeat(heartbeatMax); - proxy.tuneOk(channelMax, maxFrameSize, heartbeatMax); + // this method is only ever called when this Connection + // is a federation link where this Broker is acting as + // a client to another Broker + uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax); + connection.setHeartbeat(hb); + connection.startLinkHeartbeatTimeoutTask(); + + proxy.tuneOk(channelMax, maxFrameSize, hb); proxy.open("/", Array(), true); } diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index 2e25543308..9346e7b1ac 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -61,7 +61,7 @@ class ConnectionHandler : public framing::FrameHandler SecureConnection* secured; bool isOpen; - Handler(Connection& connection, bool isClient, bool isShadow=false); + Handler(Connection& connection, bool isClient); ~Handler(); void startOk(const qpid::framing::ConnectionStartOkBody& body); void startOk(const qpid::framing::FieldTable& clientProperties, @@ -99,7 +99,7 @@ class ConnectionHandler : public framing::FrameHandler bool handle(const qpid::framing::AMQMethodBody& method); public: - ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false ); + ConnectionHandler(Connection& connection, bool isClient ); void close(framing::connection::CloseCode code, const std::string& text); void heartbeat(); void handle(framing::AMQFrame& frame); diff --git a/cpp/src/qpid/broker/ConnectionObservers.h b/cpp/src/qpid/broker/ConnectionObservers.h index 07e515f3c9..e9014c80c3 100644 --- a/cpp/src/qpid/broker/ConnectionObservers.h +++ b/cpp/src/qpid/broker/ConnectionObservers.h @@ -23,9 +23,7 @@ */ #include "ConnectionObserver.h" -#include "qpid/sys/Mutex.h" -#include <set> -#include <algorithm> +#include "Observers.h" namespace qpid { namespace broker { @@ -35,18 +33,10 @@ namespace broker { * Calling a ConnectionObserver function will call that function on each observer. * THREAD SAFE. */ -class ConnectionObservers : public ConnectionObserver { +class ConnectionObservers : public ConnectionObserver, + public Observers<ConnectionObserver> +{ public: - void add(boost::shared_ptr<ConnectionObserver> observer) { - sys::Mutex::ScopedLock l(lock); - observers.insert(observer); - } - - void remove(boost::shared_ptr<ConnectionObserver> observer) { - sys::Mutex::ScopedLock l(lock); - observers.erase(observer); - } - void connection(Connection& c) { each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c))); } @@ -62,16 +52,6 @@ class ConnectionObservers : public ConnectionObserver { void forced(Connection& c, const std::string& text) { each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text)); } - - private: - typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers; - sys::Mutex lock; - Observers observers; - - template <class F> void each(F f) { - sys::Mutex::ScopedLock l(lock); - std::for_each(observers.begin(), observers.end(), f); - } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 682c75ed4f..64073621be 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -54,7 +54,9 @@ class Consumer bool preAcquires() const { return acquires; } const std::string& getName() const { return name; } + /**@return the position of the last message seen by this consumer */ virtual framing::SequenceNumber getPosition() const { return position; } + virtual void setPosition(framing::SequenceNumber pos) { position = pos; } virtual bool deliver(QueuedMessage& msg) = 0; diff --git a/cpp/src/qpid/broker/Daemon.h b/cpp/src/qpid/broker/Daemon.h index a9cd98bce2..2bb9fc5577 100644 --- a/cpp/src/qpid/broker/Daemon.h +++ b/cpp/src/qpid/broker/Daemon.h @@ -74,7 +74,6 @@ class Daemon : private boost::noncopyable pid_t pid; int pipeFds[2]; - int lockFileFd; std::string lockFile; std::string pidDir; }; diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 5d9aea7509..2fa7ce0fc5 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -26,6 +26,9 @@ #include <iostream> using namespace qpid::broker; + +using std::string; + using namespace qpid::framing; using namespace qpid::sys; using qpid::management::Manageable; diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 8d20b0df81..82d4b4df15 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -35,6 +35,8 @@ namespace qpid { namespace broker { +using std::string; + using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; @@ -167,7 +169,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : - name(_name), durable(false), persistenceId(0), sequence(false), + name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 7376f814ed..fba752210f 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -174,8 +174,9 @@ public: bool isDurable() { return durable; } qpid::framing::FieldTable& getArgs() { return args; } - Exchange::shared_ptr getAlternate() { return alternate; } - void setAlternate(Exchange::shared_ptr _alternate); + QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; } + QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate); + void incAlternateUsers() { alternateUsers++; } void decAlternateUsers() { alternateUsers--; } bool inUseAsAlternate() { return alternateUsers > 0; } diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 43d7268dfb..b31c7bd7b8 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -19,6 +19,7 @@ * */ +#include "qpid/broker/Broker.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" @@ -42,38 +43,42 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, bool durable, const FieldTable& args){ - RWlock::ScopedWlock locker(lock); - ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) { - Exchange::shared_ptr exchange; - - if (type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); - }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); - }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); - }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); - }else if (type == ManagementDirectExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); - }else if (type == ManagementTopicExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); - }else if (type == Link::exchangeTypeName) { - exchange = Link::linkExchangeFactory(name); - }else{ - FunctionMap::iterator i = factory.find(type); - if (i == factory.end()) { - throw UnknownExchangeTypeException(); - } else { - exchange = i->second(name, durable, args, parent, broker); + Exchange::shared_ptr exchange; + std::pair<Exchange::shared_ptr, bool> result; + { + RWlock::ScopedWlock locker(lock); + ExchangeMap::iterator i = exchanges.find(name); + if (i == exchanges.end()) { + if (type == TopicExchange::typeName){ + exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); + }else if(type == DirectExchange::typeName){ + exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); + }else if(type == FanOutExchange::typeName){ + exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); + }else if (type == HeadersExchange::typeName) { + exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); + }else if (type == ManagementDirectExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); + }else if (type == ManagementTopicExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); + }else if (type == Link::exchangeTypeName) { + exchange = Link::linkExchangeFactory(name); + }else{ + FunctionMap::iterator i = factory.find(type); + if (i == factory.end()) { + throw UnknownExchangeTypeException(); + } else { + exchange = i->second(name, durable, args, parent, broker); + } } + exchanges[name] = exchange; + result = std::pair<Exchange::shared_ptr, bool>(exchange, true); + } else { + result = std::pair<Exchange::shared_ptr, bool>(i->second, false); } - exchanges[name] = exchange; - return std::pair<Exchange::shared_ptr, bool>(exchange, true); - } else { - return std::pair<Exchange::shared_ptr, bool>(i->second, false); } + if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange); + return result; } void ExchangeRegistry::destroy(const string& name){ @@ -82,12 +87,17 @@ void ExchangeRegistry::destroy(const string& name){ (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) || name == "qpid.management") throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'")); - RWlock::ScopedWlock locker(lock); - ExchangeMap::iterator i = exchanges.find(name); - if (i != exchanges.end()) { - i->second->destroy(); - exchanges.erase(i); + Exchange::shared_ptr exchange; + { + RWlock::ScopedWlock locker(lock); + ExchangeMap::iterator i = exchanges.find(name); + if (i != exchanges.end()) { + exchange = i->second; + i->second->destroy(); + exchanges.erase(i); + } } + if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange); } Exchange::shared_ptr ExchangeRegistry::find(const string& name){ diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 2bce99b6fe..56c894c129 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -24,6 +24,9 @@ #include <algorithm> using namespace qpid::broker; + +using std::string; + using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 6648ae0422..9975d26c72 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -26,6 +26,9 @@ using namespace qpid::broker; + +using std::string; + using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index f21c861149..84dd163ac3 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -125,18 +125,20 @@ boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name return Exchange::shared_ptr(new LinkExchange(_name)); } -Link::Link(LinkRegistry* _links, - MessageStore* _store, +Link::Link(const string& _name, + LinkRegistry* _links, const string& _host, uint16_t _port, const string& _transport, + DestroyedListener l, bool _durable, const string& _authMechanism, const string& _username, const string& _password, Broker* _broker, - Manageable* parent) - : links(_links), store(_store), + Manageable* parent, + bool failover_) + : name(_name), links(_links), configuredTransport(_transport), configuredHost(_host), configuredPort(_port), host(_host), port(_port), transport(_transport), durable(_durable), @@ -149,7 +151,9 @@ Link::Link(LinkRegistry* _links, channelCounter(1), connection(0), agent(0), + listener(l), timerTask(new LinkTimerTask(*this, broker->getTimer())), + failover(failover_), failoverChannel(0) { if (parent != 0 && broker != 0) @@ -157,7 +161,10 @@ Link::Link(LinkRegistry* _links, agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); + mgmtObject = new _qmf::Link(agent, this, parent, name, durable); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); agent->addObject(mgmtObject, 0, durable); } } @@ -169,13 +176,15 @@ Link::Link(LinkRegistry* _links, } broker->getTimer().add(timerTask); - stringstream _name; - _name << "qpid.link." << transport << ":" << host << ":" << port; - std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(), - exchangeTypeName); - failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); - assert(failoverExchange); - failoverExchange->setLink(this); + if (failover) { + stringstream exchangeName; + exchangeName << "qpid.link." << name; + std::pair<Exchange::shared_ptr, bool> rc = + broker->getExchanges().declare(exchangeName.str(), exchangeTypeName); + failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); + assert(failoverExchange); + failoverExchange->setLink(this); + } } Link::~Link () @@ -187,7 +196,8 @@ Link::~Link () if (mgmtObject != 0) mgmtObject->resourceDestroy (); - broker->getExchanges().destroy(failoverExchange->getName()); + if (failover) + broker->getExchanges().destroy(failoverExchange->getName()); } void Link::setStateLH (int newState) @@ -239,16 +249,19 @@ void Link::established(Connection* c) if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); - - Mutex::ScopedLock mutex(lock); - setStateLH(STATE_OPERATIONAL); - currentInterval = 1; - visitCount = 0; - connection = c; - if (closing) + bool isClosing = false; + { + Mutex::ScopedLock mutex(lock); + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + connection = c; + isClosing = closing; + } + if (isClosing) destroy(); else // Process any IO tasks bridges added before established. - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -261,16 +274,26 @@ void Link::setUrl(const Url& u) { namespace { - /** invoked when session used to subscribe to remote's amq.failover exchange detaches */ - void sessionDetached(Link *link) { - QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName()); - } +class DetachedCallback : public SessionHandler::ErrorListener { + public: + DetachedCallback(const Link& link) : name(link.getName()) {} + void connectionException(framing::connection::CloseCode, const std::string&) {} + void channelException(framing::session::DetachCode, const std::string&) {} + void executionException(framing::execution::ErrorCode, const std::string&) {} + void detach() {} + private: + const std::string name; +}; } - void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; + + if (!hideManagement() && connection->GetManagementObject()) { + mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); + } + // Get default URL from known-hosts if not already set if (url.empty()) { const std::vector<Url>& known = connection->getKnownHosts(); @@ -282,80 +305,82 @@ void Link::opened() { QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } - // - // attempt to subscribe to failover exchange for updates from remote - // - - const std::string queueName = "qpid.link." + framing::Uuid(true).str(); - failoverChannel = nextChannel(); - - SessionHandler& sessionHandler = connection->getChannel(failoverChannel); - sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) ); - failoverSession = queueName; - sessionHandler.attachAs(failoverSession); - - framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); - - remoteBroker.getQueue().declare(queueName, - "", // alt-exchange - false, // passive - false, // durable - true, // exclusive - true, // auto-delete - FieldTable()); - remoteBroker.getExchange().bind(queueName, - FAILOVER_EXCHANGE, - "", // no key - FieldTable()); - remoteBroker.getMessage().subscribe(queueName, - failoverExchange->getName(), - 1, // implied-accept mode - 0, // pre-acquire mode - false, // exclusive - "", // resume-id - 0, // resume-ttl + if (failover) { + // + // attempt to subscribe to failover exchange for updates from remote + // + + const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + failoverChannel = nextChannel(); + + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + sessionHandler.setErrorListener( + boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this))); + failoverSession = queueName; + sessionHandler.attachAs(failoverSession); + + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + + remoteBroker.getQueue().declare(queueName, + "", // alt-exchange + false, // passive + false, // durable + true, // exclusive + true, // auto-delete + FieldTable()); + remoteBroker.getExchange().bind(queueName, + FAILOVER_EXCHANGE, + "", // no key FieldTable()); - remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); - remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); + remoteBroker.getMessage().subscribe(queueName, + failoverExchange->getName(), + 1, // implied-accept mode + 0, // pre-acquire mode + false, // exclusive + "", // resume-id + 0, // resume-ttl + FieldTable()); + remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); + remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); + } } void Link::closed(int, std::string text) { - bool isClosing = false; - { - Mutex::ScopedLock mutex(lock); - QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); + Mutex::ScopedLock mutex(lock); + QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); - connection = 0; - if (state == STATE_OPERATIONAL) { + connection = 0; + + if (!hideManagement()) { + mgmtObject->set_connectionRef(qpid::management::ObjectId()); + if (state == STATE_OPERATIONAL && agent) { stringstream addr; addr << host << ":" << port; - if (!hideManagement() && agent) - agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); + agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } + } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - (*i)->closed(); - created.push_back(*i); - } - active.clear(); + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); + created.push_back(*i); + } + active.clear(); - if (state != STATE_FAILED && state != STATE_PASSIVE) - { - setStateLH(STATE_WAITING); - if (!hideManagement()) - mgmtObject->set_lastError (text); - } + if (state != STATE_FAILED && state != STATE_PASSIVE) + { + setStateLH(STATE_WAITING); + if (!hideManagement()) + mgmtObject->set_lastError (text); } - // Call destroy outside of the lock, don't want to be deleted with lock held. - if (isClosing) - destroy(); } -// Called in connection IO thread. +// Called in connection IO thread, cleans up the connection before destroying Link void Link::destroy () { Bridges toDelete; + + timerTask->cancel(); // call prior to locking so maintenance visit can finish { Mutex::ScopedLock mutex(lock); @@ -374,14 +399,13 @@ void Link::destroy () for (Bridges::iterator i = created.begin(); i != created.end(); i++) toDelete.push_back(*i); created.clear(); - - timerTask->cancel(); } + // Now delete all bridges on this link (don't hold the lock for this). for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) - (*i)->destroy(); + (*i)->close(); toDelete.clear(); - links->destroy (configuredHost, configuredPort); + listener(this); // notify LinkRegistry that this Link has been destroyed } void Link::add(Bridge::shared_ptr bridge) @@ -423,7 +447,7 @@ void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); - if (state != STATE_OPERATIONAL) + if (state != STATE_OPERATIONAL || closing) return; // check for bridge session errors and recover @@ -460,7 +484,7 @@ void Link::ioThreadProcessing() void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - + if (closing) return; if (state == STATE_WAITING) { visitCount++; @@ -476,21 +500,27 @@ void Link::maintenanceVisit () } } } - else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) + else if (state == STATE_OPERATIONAL && + (!active.empty() || !created.empty() || !cancellations.empty()) && + connection && connection->isOpen()) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); - } +} void Link::reconnectLH(const Address& a) { host = a.host; port = a.port; transport = a.protocol; - startConnectionLH(); + if (!hideManagement()) { stringstream errorString; - errorString << "Failed over to " << a; + errorString << "Failing over to " << a; mgmtObject->set_lastError(errorString.str()); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); } + startConnectionLH(); } bool Link::tryFailoverLH() { @@ -499,15 +529,14 @@ bool Link::tryFailoverLH() { if (url.empty()) return false; Address next = url[reconnectNext++]; if (next.host != host || next.port != port || next.protocol != transport) { - links->changeAddress(Address(transport, host, port), next); - QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); + QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next); reconnectLH(next); return true; } return false; } -// Management updates for a linke are inconsistent in a cluster, so they are +// Management updates for a link are inconsistent in a cluster, so they are // suppressed. bool Link::hideManagement() const { return !mgmtObject || ( broker && broker->isInCluster()); @@ -536,18 +565,34 @@ void Link::setPersistenceId(uint64_t id) const const string& Link::getName() const { - return configuredHost; + return name; +} + +const std::string Link::ENCODED_IDENTIFIER("link.v2"); +const std::string Link::ENCODED_IDENTIFIER_V1("link"); + +bool Link::isEncodedLink(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string transport; string authMechanism; string username; string password; + string name; + if (kind == ENCODED_IDENTIFIER) { + // newer version provides a link name. + buffer.getShortString(name); + } buffer.getShortString(host); port = buffer.getShort(); buffer.getShortString(transport); @@ -556,12 +601,21 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(username); buffer.getShortString(password); - return links.declare(host, port, transport, durable, authMechanism, username, password).first; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the Link by host:port, there was no name + * assigned. So create a name for the new Link. + */ + name = createName(transport, host, port); + } + + return links.declare(name, host, port, transport, durable, authMechanism, + username, password).first; } void Link::encode(Buffer& buffer) const { - buffer.putShortString(string("link")); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); buffer.putShortString(configuredHost); buffer.putShort(configuredPort); buffer.putShortString(configuredTransport); @@ -573,8 +627,9 @@ void Link::encode(Buffer& buffer) const uint32_t Link::encodedSize() const { - return configuredHost.size() + 1 // short-string (host) - + 5 // short-string ("link") + return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + + name.size() + 1 + + configuredHost.size() + 1 // short-string (host) + 2 // port + configuredTransport.size() + 1 // short-string(transport) + 1 // durable @@ -589,6 +644,7 @@ ManagementObject* Link::GetManagementObject (void) const } void Link::close() { + QPID_LOG(debug, "Link::close(), link=" << name ); Mutex::ScopedLock mutex(lock); if (!closing) { closing = true; @@ -609,36 +665,31 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te return Manageable::STATUS_OK; case _qmf::Link::METHOD_BRIDGE : + /* TBD: deprecate this interface in favor of the Broker::create() method. The + * Broker::create() method allows the user to assign a name to the bridge. + */ + QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='bridge' instead."); _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; - QPID_LOG(debug, "Link::bridge() request received"); - - // Durable bridges are only valid on durable links - if (iargs.i_durable && !durable) { - text = "Can't create a durable route on a non-durable link"; - return Manageable::STATUS_USER; - } - - if (iargs.i_dynamic) { - Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); - if (exchange.get() == 0) { - text = "Exchange not found"; - return Manageable::STATUS_USER; - } - if (!exchange->supportsDynamicBinding()) { - text = "Exchange type does not support dynamic routing"; - return Manageable::STATUS_USER; + QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src << + "; dest=" << iargs.i_dest << "; key=" << iargs.i_key); + + // Does a bridge already exist that has the src/dest/key? If so, re-use the + // existing bridge - this behavior is backward compatible with previous releases. + Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key); + if (!bridge) { + // need to create a new bridge on this link. + std::pair<Bridge::shared_ptr, bool> rc = + links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key), + *this, iargs.i_durable, + iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, + iargs.i_dynamic, iargs.i_sync); + if (!rc.first) { + text = "invalid parameters"; + return Manageable::STATUS_PARAMETER_INVALID; } } - - std::pair<Bridge::shared_ptr, bool> result = - links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src, - iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, - iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, - iargs.i_dynamic, iargs.i_sync); - - if (result.second && iargs.i_durable) - store->create(*result.first); - return Manageable::STATUS_OK; } @@ -666,11 +717,13 @@ void Link::closeConnection( const std::string& reason) { if (connection != 0) { // cancel our subscription to the failover exchange - SessionHandler& sessionHandler = connection->getChannel(failoverChannel); - if (sessionHandler.getSession()) { - framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); - remoteBroker.getMessage().cancel(failoverExchange->getName()); - remoteBroker.getSession().detach(failoverSession); + if (failover) { + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + if (sessionHandler.getSession()) { + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + remoteBroker.getMessage().cancel(failoverExchange->getName()); + remoteBroker.getSession().detach(failoverSession); + } } connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); connection = 0; @@ -716,6 +769,23 @@ void Link::setState(const framing::FieldTable& state) } } +std::string Link::createName(const std::string& transport, + const std::string& host, + uint16_t port) +{ + stringstream linkName; + linkName << QPID_NAME_PREFIX << transport << std::string(":") + << host << std::string(":") << port; + return linkName.str(); +} + + +bool Link::pendingConnection(const std::string& _host, uint16_t _port) const +{ + Mutex::ScopedLock mutex(lock); + return (isConnecting() && _port == port && _host == host); +} + const std::string Link::exchangeTypeName("qpid.LinkExchange"); diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index a97fa48664..f0cb90e73b 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -25,7 +25,6 @@ #include <boost/shared_ptr.hpp> #include "qpid/Url.h" #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableConfig.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/BrokerImportExport.h" @@ -52,8 +51,8 @@ class LinkExchange; class Link : public PersistableConfig, public management::Manageable { private: mutable sys::Mutex lock; + const std::string name; LinkRegistry* links; - MessageStore* store; // these remain constant across failover - used to identify this link const std::string configuredTransport; @@ -64,7 +63,8 @@ class Link : public PersistableConfig, public management::Manageable { uint16_t port; std::string transport; - bool durable; + bool durable; + std::string authMechanism; std::string username; std::string password; @@ -85,8 +85,10 @@ class Link : public PersistableConfig, public management::Manageable { uint channelCounter; Connection* connection; management::ManagementAgent* agent; + boost::function<void(Link*)> listener; boost::intrusive_ptr<sys::TimerTask> timerTask; boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange + bool failover; // Do we subscribe to a failover exchange? uint failoverChannel; std::string failoverSession; @@ -101,33 +103,39 @@ class Link : public PersistableConfig, public management::Manageable { void setStateLH (int newState); void startConnectionLH(); // Start the IO Connection - void destroy(); // Called when mgmt deletes this link + void destroy(); // Cleanup connection before link goes away void ioThreadProcessing(); // Called on connection's IO thread by request bool tryFailoverLH(); // Called during maintenance visit bool hideManagement() const; + void reconnectLH(const Address&); //called by LinkRegistry - void established(Connection*); // Called when connection is create + // connection management (called by LinkRegistry) + void established(Connection*); // Called when connection is created void opened(); // Called when connection is open (after create) void closed(int, std::string); // Called when connection goes away - void reconnectLH(const Address&); //called by LinkRegistry + void notifyConnectionForced(const std::string text); void closeConnection(const std::string& reason); + bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote? friend class LinkRegistry; // to call established, opened, closed public: typedef boost::shared_ptr<Link> shared_ptr; + typedef boost::function<void(Link*)> DestroyedListener; - Link(LinkRegistry* links, - MessageStore* store, + Link(const std::string& name, + LinkRegistry* links, const std::string& host, uint16_t port, const std::string& transport, + DestroyedListener l, bool durable, const std::string& authMechanism, const std::string& username, const std::string& password, Broker* broker, - management::Manageable* parent = 0); + management::Manageable* parent = 0, + bool failover=true); virtual ~Link(); /** these return the *configured* transport/host/port, which does not change over the @@ -139,7 +147,7 @@ class Link : public PersistableConfig, public management::Manageable { /** returns the current address of the remote, which may be different from the configured transport/host/port due to failover. Returns true if connection is active */ - bool getRemoteAddress(qpid::Address& addr) const; + QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const; bool isDurable() { return durable; } void maintenanceVisit (); @@ -148,15 +156,17 @@ class Link : public PersistableConfig, public management::Manageable { void cancel(Bridge::shared_ptr); QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection. - QPID_BROKER_EXTERN void close(); // Close the link from within the broker. + + // Close the link. + QPID_BROKER_EXTERN void close(); std::string getAuthMechanism() { return authMechanism; } std::string getUsername() { return username; } std::string getPassword() { return password; } Broker* getBroker() { return broker; } - void notifyConnectionForced(const std::string text); void setPassive(bool p); + bool isConnecting() const { return state == STATE_CONNECTING; } // PersistableConfig: void setPersistenceId(uint64_t id) const; @@ -165,7 +175,10 @@ class Link : public PersistableConfig, public management::Manageable { void encode(framing::Buffer& buffer) const; const std::string& getName() const; + static const std::string ENCODED_IDENTIFIER; + static const std::string ENCODED_IDENTIFIER_V1; static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + static bool isEncodedLink(const std::string& key); // Manageable entry points management::ManagementObject* GetManagementObject(void) const; @@ -178,6 +191,16 @@ class Link : public PersistableConfig, public management::Manageable { // replicate internal state of this Link for clustering void getState(framing::FieldTable& state) const; void setState(const framing::FieldTable& state); + + /** create a name for a link (if none supplied by user config) */ + static std::string createName(const std::string& transport, + const std::string& host, + uint16_t port); + + /** The current connction for this link. Note returns 0 if the link is not + * presently connected. + */ + Connection* getConnection() { return connection; } }; } } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index d89f220d1b..0507fe6521 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -68,54 +68,92 @@ LinkRegistry::LinkRegistry (Broker* _broker) : LinkRegistry::~LinkRegistry() {} +/** find link by the *configured* remote address */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host, + uint16_t port, + const std::string& transport) +{ + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) { + Link::shared_ptr& link = i->second; + if (link->getHost() == host && + link->getPort() == port && + (transport.empty() || link->getTransport() == transport)) + return link; + } + return boost::shared_ptr<Link>(); +} -void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) +/** find link by name */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name) { Mutex::ScopedLock locker(lock); - std::string oldKey = createKey(oldAddress); - std::string newKey = createKey(newAddress); - if (links.find(newKey) != links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); - } else { - LinkMap::iterator i = links.find(oldKey); - if (i == links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); - } else { - links[newKey] = i->second; - links.erase(oldKey); - QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); - } - } + LinkMap::iterator l = links.find(name); + if (l != links.end()) + return l->second; + return boost::shared_ptr<Link>(); } -pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host, +pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, + const string& host, uint16_t port, const string& transport, bool durable, const string& authMechanism, const string& username, - const string& password) + const string& password, + bool failover) { Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(name); if (i == links.end()) { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, - authMechanism, username, password, - broker, parent)); - links[key] = link; + link = Link::shared_ptr ( + new Link (name, this, host, port, transport, + boost::bind(&LinkRegistry::linkDestroyed, this, _1), + durable, authMechanism, username, password, broker, + parent, failover)); + if (durable && store) store->create(*link); + links[name] = link; + QPID_LOG(debug, "Creating new link; name=" << name ); return std::pair<Link::shared_ptr, bool>(link, true); } return std::pair<Link::shared_ptr, bool>(i->second, false); } -pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, - uint16_t port, +/** find bridge by link & route info */ +Bridge::shared_ptr LinkRegistry::getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) { + if (i->second->getSrc() == src && i->second->getDest() == dest && + i->second->getKey() == key && i->second->getLink() && + i->second->getLink()->getName() == link.getName()) { + return i->second; + } + } + return Bridge::shared_ptr(); +} + +/** find bridge by name */ +Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name) +{ + Mutex::ScopedLock locker(lock); + BridgeMap::iterator b = bridges.find(name); + if (b != bridges.end()) + return b->second; + return Bridge::shared_ptr(); +} + +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, + Link& link, bool durable, const std::string& src, const std::string& dest, @@ -126,22 +164,32 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, const std::string& excludes, bool dynamic, uint16_t sync, - Bridge::InitializeCallback init + Bridge::InitializeCallback init, + const std::string& queueName, + const std::string& altExchange ) { Mutex::ScopedLock locker(lock); - QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); + // Durable bridges are only valid on durable links + if (durable && !link.isDurable()) { + QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName()); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + if (dynamic) { + Exchange::shared_ptr exchange = broker->getExchanges().get(src); + if (exchange.get() == 0) { + QPID_LOG(error, "Exchange not found, name='" << src << "'" ); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + if (!exchange->supportsDynamicBinding()) { + QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'"); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + } - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(name); if (b == bridges.end()) { _qmf::ArgsLinkBridge args; @@ -159,23 +207,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, args.i_sync = sync; bridge = Bridge::shared_ptr - (new Bridge (l->second.get(), l->second->nextChannel(), - boost::bind(&LinkRegistry::destroy, this, - host, port, src, dest, key), - args, init)); - bridges[bridgeKey] = bridge; - l->second->add(bridge); + (new Bridge (name, &link, link.nextChannel(), + boost::bind(&LinkRegistry::destroyBridge, this, _1), + args, init, queueName, altExchange)); + bridges[name] = bridge; + link.add(bridge); + if (durable && store) + store->create(*bridge); + + QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() << + "' from " << src << " to " << dest << " (" << key << ")"); + return std::pair<Bridge::shared_ptr, bool>(bridge, true); } return std::pair<Bridge::shared_ptr, bool>(b->second, false); } -void LinkRegistry::destroy(const string& host, const uint16_t port) +/** called back by the link when it has completed its cleanup and can be removed. */ +void LinkRegistry::linkDestroyed(Link *link) { + QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName()); Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(link->getName()); if (i != links.end()) { if (i->second->isDurable() && store) @@ -184,27 +238,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) } } -void LinkRegistry::destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key) +/** called back by bridge when its destruction has been requested */ +void LinkRegistry::destroyBridge(Bridge *bridge) { + QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName()); Mutex::ScopedLock locker(lock); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); - - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return; - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(bridge->getName()); if (b == bridges.end()) return; - l->second->cancel(b->second); + Link *link = b->second->getLink(); + if (link) { + link->cancel(b->second); + } if (b->second->isDurable()) store->destroy(*(b->second)); bridges.erase(b); @@ -219,26 +266,71 @@ MessageStore* LinkRegistry::getStore() const { return store; } -Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId) -{ - // Convert keyOrMgmtId to a host:port key. - // - // TODO aconway 2011-02-01: centralize code that constructs/parses - // connection management IDs. Currently sys:: protocol factories - // and IO plugins construct the IDs and LinkRegistry parses them. - size_t separator = keyOrMgmtId.find('-'); - if (separator == std::string::npos) separator = 0; - std::string key = keyOrMgmtId.substr(separator+1, std::string::npos); +namespace { + void extractHostPort(const std::string& connId, std::string *host, uint16_t *port) + { + // Extract host and port of remote broker from connection id string. + // + // TODO aconway 2011-02-01: centralize code that constructs/parses connection + // management IDs. Currently sys:: protocol factories and IO plugins construct the + // IDs and LinkRegistry parses them. + // KAG: current connection id format assumed: + // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are + // contained within brackets "[...]", example: + // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us + // if this assumption changes! + size_t separator = connId.find('-'); + assert(separator != std::string::npos); + std::string remote = connId.substr(separator+1, std::string::npos); + separator = remote.rfind(":"); + assert(separator != std::string::npos); + *host = remote.substr(0, separator); + // IPv6 - host is bracketed by "[]", strip them + if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') { + *host = host->substr(1, host->length() - 2); + } + try { + *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos)); + } catch (const boost::bad_lexical_cast&) { + QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'"); + assert(false); + } + } +} +/** find the Link that corresponds to the given connection */ +Link::shared_ptr LinkRegistry::findLink(const std::string& connId) +{ Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l != links.end()) return l->second; - else return Link::shared_ptr(); + ConnectionMap::iterator c = connections.find(connId); + if (c != connections.end()) { + LinkMap::iterator l = links.find(c->second); + if (l != links.end()) + return l->second; + } + return Link::shared_ptr(); } void LinkRegistry::notifyConnection(const std::string& key, Connection* c) { - Link::shared_ptr link = findLink(key); + // find a link that is attempting to connect to the remote, and + // create a mapping from connection id to link + QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); + std::string host; + uint16_t port = 0; + extractHostPort( key, &host, &port ); + Link::shared_ptr link; + { + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) { + if (l->second->pendingConnection(host, port)) { + link = l->second; + connections[key] = link->getName(); + break; + } + } + } + if (link) { link->established(c); c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); @@ -343,20 +435,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key) } -std::string LinkRegistry::createKey(const qpid::Address& a) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - return createKey(a.host, a.port); -} - -std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - stringstream keystream; - keystream << host << ":" << port; - return keystream.str(); -} - void LinkRegistry::setPassive(bool p) { Mutex::ScopedLock locker(lock); @@ -369,10 +447,12 @@ void LinkRegistry::setPassive(bool p) } void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { + Mutex::ScopedLock locker(lock); for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); } void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { + Mutex::ScopedLock locker(lock); for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); } diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 8e9d2f4b0d..5a39b62bd1 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -42,9 +42,11 @@ namespace broker { class LinkRegistry { typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap; typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; + typedef std::map<std::string, std::string> ConnectionMap; - LinkMap links; - BridgeMap bridges; + LinkMap links; /** indexed by name of Link */ + BridgeMap bridges; /** indexed by name of Bridge */ + ConnectionMap connections; /** indexed by connection identifier, gives link name */ qpid::sys::Mutex lock; Broker* broker; @@ -54,15 +56,18 @@ namespace broker { std::string realm; boost::shared_ptr<Link> findLink(const std::string& key); - static std::string createKey(const Address& address); - static std::string createKey(const std::string& host, uint16_t port); - // Methods called by the connection observer. + // Methods called by the connection observer, key is connection identifier void notifyConnection (const std::string& key, Connection* c); void notifyOpened (const std::string& key); void notifyClosed (const std::string& key); void notifyConnectionForced (const std::string& key, const std::string& text); - friend class LinkRegistryConnectionObserver; + friend class LinkRegistryConnectionObserver; + + /** Notify the registry that a Link has been destroyed */ + void linkDestroyed(Link*); + /** Request to destroy a Bridge */ + void destroyBridge(Bridge*); public: QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests @@ -70,17 +75,29 @@ namespace broker { QPID_BROKER_EXTERN ~LinkRegistry(); QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool> - declare(const std::string& host, + declare(const std::string& name, + const std::string& host, uint16_t port, const std::string& transport, bool durable, const std::string& authMechanism, const std::string& username, - const std::string& password); + const std::string& password, + bool failover=true); + + /** determine if Link exists */ + QPID_BROKER_EXTERN boost::shared_ptr<Link> + getLink(const std::string& name); + /** host,port,transport will be matched against the configured values, which may + be different from the current values due to failover */ + QPID_BROKER_EXTERN boost::shared_ptr<Link> + getLink(const std::string& configHost, + uint16_t configPort, + const std::string& configTransport = std::string()); QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool> - declare(const std::string& host, - uint16_t port, + declare(const std::string& name, + Link& link, bool durable, const std::string& src, const std::string& dest, @@ -91,16 +108,18 @@ namespace broker { const std::string& excludes, bool dynamic, uint16_t sync, - Bridge::InitializeCallback=0 + Bridge::InitializeCallback=0, + const std::string& queueName="", + const std::string& altExchange="" ); - - QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port); - - QPID_BROKER_EXTERN void destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key); + /** determine if Bridge exists */ + QPID_BROKER_EXTERN Bridge::shared_ptr + getBridge(const std::string& name); + QPID_BROKER_EXTERN Bridge::shared_ptr + getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key); /** * Register the manageable parent for declared queues @@ -126,11 +145,6 @@ namespace broker { QPID_BROKER_EXTERN uint16_t getPort (const std::string& key); /** - * Called by links failing over to new address - */ - void changeAddress(const Address& oldAddress, const Address& newAddress); - - /** * Called to alter passive state. In passive state the links * and bridges managed by a link registry will be recorded and * updated but links won't actually establish connections and diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 40dfba39f4..4dd8a349dd 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -384,6 +384,18 @@ void Message::addTraceId(const std::string& id) } } +void Message::clearTrace() +{ + sys::Mutex::ScopedLock l(lock); + if (isA<MessageTransferBody>()) { + FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); + std::string trace = headers.getAsString(X_QPID_TRACE); + if (!trace.empty()) { + headers.setString(X_QPID_TRACE, ""); + } + } +} + void Message::setTimestamp() { sys::Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index dda45d73e6..90e4eec889 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -161,6 +161,7 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); + void clearTrace(); void forcePersistent(); bool isForcedPersistent(); diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index f70c996975..83c8ca6868 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -40,13 +40,16 @@ size_t MessageDeque::index(const framing::SequenceNumber& position) bool MessageDeque::deleted(const QueuedMessage& m) { size_t i = index(m.position); - if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) { - messages[i].status = QueuedMessage::DELETED; - clean(); - return true; - } else { - return false; + if (i < messages.size()) { + QueuedMessage *qm = &messages[i]; + if (qm->status != QueuedMessage::DELETED) { + qm->status = QueuedMessage::DELETED; + qm->payload = 0; // message no longer needed + clean(); + return true; + } } + return false; } size_t MessageDeque::size() @@ -144,6 +147,7 @@ QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) { messages.back().status = QueuedMessage::AVAILABLE; if (head >= messages.size()) head = messages.size() - 1; ++available; + clean(); // QPID-4046: let producer help clean the backlog of deleted messages return &messages.back(); } @@ -173,12 +177,37 @@ void MessageDeque::updateAcquired(const QueuedMessage& acquired) } } +namespace { +bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; } +} // namespace + +void MessageDeque::setPosition(const framing::SequenceNumber& n) { + size_t i = index(n+1); + if (i >= messages.size()) return; // Nothing to do. + + // Assertion to verify the precondition: no messaages after n. + assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) == + messages.end()); + messages.erase(messages.begin()+i, messages.end()); + if (head >= messages.size()) head = messages.size() - 1; + // Re-count the available messages + available = 0; + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE) ++available; + } +} + void MessageDeque::clean() { - while (messages.size() && messages.front().status == QueuedMessage::DELETED) { + // QPID-4046: If a queue has multiple consumers, then it is possible for a large + // collection of deleted messages to build up. Limit the number of messages cleaned + // up on each call to clean(). + size_t count = 0; + while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) { messages.pop_front(); - if (head) --head; + count += 1; } + head = (head > count) ? head - count : 0; } void MessageDeque::foreach(Functor f) diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h index 9b53716d4e..c5670b2a72 100644 --- a/cpp/src/qpid/broker/MessageDeque.h +++ b/cpp/src/qpid/broker/MessageDeque.h @@ -44,7 +44,7 @@ class MessageDeque : public Messages bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void updateAcquired(const QueuedMessage& acquired); - + void setPosition(const framing::SequenceNumber&); void foreach(Functor); void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp index 9b164d4e5c..592f3fefde 100644 --- a/cpp/src/qpid/broker/MessageMap.cpp +++ b/cpp/src/qpid/broker/MessageMap.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/MessageMap.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" +#include <algorithm> namespace qpid { namespace broker { @@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) QueuedMessage& a = messages[added.position]; a = added; a.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Added message at " << a.position); + QPID_LOG(debug, "Added message " << a); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); result.first->second.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first); + QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first); return true; } } +void MessageMap::setPosition(const framing::SequenceNumber& seq) { + // Nothing to do, just assert that the precondition is respected and there + // are no undeleted messages after seq. + (void) seq; assert(messages.empty() || (--messages.end())->first <= seq); +} + void MessageMap::foreach(Functor f) { for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h index a668450250..1f0481cb6b 100644 --- a/cpp/src/qpid/broker/MessageMap.h +++ b/cpp/src/qpid/broker/MessageMap.h @@ -6,7 +6,7 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file +o * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at @@ -50,6 +50,7 @@ class MessageMap : public Messages virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool consume(QueuedMessage&); virtual bool push(const QueuedMessage& added, QueuedMessage& removed); + void setPosition(const framing::SequenceNumber&); void foreach(Functor); virtual void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h index 61e9fa110a..45f5e6cd81 100644 --- a/cpp/src/qpid/broker/Messages.h +++ b/cpp/src/qpid/broker/Messages.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/framing/SequenceNumber.h" #include <boost/function.hpp> namespace qpid { @@ -101,14 +102,22 @@ class Messages virtual void updateAcquired(const QueuedMessage&) { } /** + * Set the position of the back of the queue. Next message enqueued will be n+1. + *@pre Any messages with seq > n must already be dequeued. + */ + virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0; + + /** * Apply, the functor to each message held */ + virtual void foreach(Functor) = 0; /** * Remove every message held that for which the specified * predicate returns true */ virtual void removeIf(Predicate) = 0; + private: }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/NameGenerator.h b/cpp/src/qpid/broker/NameGenerator.h index 6ea25c9797..2e9f7febe2 100644 --- a/cpp/src/qpid/broker/NameGenerator.h +++ b/cpp/src/qpid/broker/NameGenerator.h @@ -32,6 +32,7 @@ namespace qpid { NameGenerator(const std::string& base); std::string generate(); }; + const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names } } diff --git a/cpp/src/qpid/broker/Observers.h b/cpp/src/qpid/broker/Observers.h new file mode 100644 index 0000000000..c62f75d6d0 --- /dev/null +++ b/cpp/src/qpid/broker/Observers.h @@ -0,0 +1,69 @@ +#ifndef QPID_BROKER_OBSERVERS_H +#define QPID_BROKER_OBSERVERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> +#include <vector> +#include <algorithm> + +namespace qpid { +namespace broker { + +/** + * Base class for collections of observers with thread-safe add/remove and traversal. + */ +template <class Observer> +class Observers +{ + public: + void add(boost::shared_ptr<Observer> observer) { + sys::Mutex::ScopedLock l(lock); + observers.push_back(observer); + } + + void remove(boost::shared_ptr<Observer> observer) { + sys::Mutex::ScopedLock l(lock); + typename List::iterator i = std::find(observers.begin(), observers.end(), observer); + observers.erase(i); + } + + protected: + typedef std::vector<boost::shared_ptr<Observer> > List; + + sys::Mutex lock; + List observers; + + template <class F> void each(F f) { + List copy; + { + sys::Mutex::ScopedLock l(lock); + copy = observers; + } + std::for_each(copy.begin(), copy.end(), f); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_OBSERVERS_H*/ diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp index ab5ec7235a..9a0fead744 100644 --- a/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/cpp/src/qpid/broker/PriorityQueue.cpp @@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const QueuedMessage& acquired) { fifo.updateAcquired(acquired); } +void PriorityQueue::setPosition(const framing::SequenceNumber& n) { + fifo.setPosition(n); +} + void PriorityQueue::foreach(Functor f) { fifo.foreach(f); diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h index 8628745db1..301367358b 100644 --- a/cpp/src/qpid/broker/PriorityQueue.h +++ b/cpp/src/qpid/broker/PriorityQueue.h @@ -52,6 +52,7 @@ class PriorityQueue : public Messages bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void updateAcquired(const QueuedMessage& acquired); + void setPosition(const framing::SequenceNumber&); void foreach(Functor); void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/PrivateImplRef.h b/cpp/src/qpid/broker/PrivateImplRef.h index 5932ab882b..d20c2f4608 100644 --- a/cpp/src/qpid/broker/PrivateImplRef.h +++ b/cpp/src/qpid/broker/PrivateImplRef.h @@ -88,15 +88,15 @@ template <class T> class PrivateImplRef { /** Set the implementation pointer in a handle */ static void set(T& t, const intrusive_ptr& p) { if (t.impl == p) return; - if (t.impl) boost::intrusive_ptr_release(t.impl); + if (t.impl) intrusive_ptr_release(t.impl); t.impl = p.get(); - if (t.impl) boost::intrusive_ptr_add_ref(t.impl); + if (t.impl) intrusive_ptr_add_ref(t.impl); } // Helper functions to implement the ctor, dtor, copy, assign - static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); } + static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); } static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } - static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); } + static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); } static T& assign(T& t, const T& x) { set(t, get(x)); return t;} }; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e7305c021d..d5267c78dc 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -49,6 +49,7 @@ #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include <iostream> #include <algorithm> @@ -67,6 +68,7 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using std::string; using std::for_each; using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; @@ -176,7 +178,8 @@ Queue::Queue(const string& _name, bool _autodelete, ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); + mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete); + mgmtObject->set_exclusive(_owner != 0); agent->addObject(mgmtObject, 0, store != 0); brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); if (brokerMgmtObject) @@ -587,21 +590,51 @@ QueuedMessage Queue::get(){ return msg; } -bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +namespace { +bool collectIf(QueuedMessage& qm, Messages::Predicate predicate, + std::deque<QueuedMessage>& collection) { - if (message.payload->hasExpired()) { - expired.push_back(message); + if (predicate(qm)) { + collection.push_back(qm); return true; } else { return false; } } +bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); } +} // namespace + +void Queue::dequeueIf(Messages::Predicate predicate, + std::deque<QueuedMessage>& dequeued) +{ + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued))); + } + if (!dequeued.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(dequeued.size()); + } + for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin(); + i != dequeued.end(); ++i) { + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf() call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); + } + } +} + /** *@param lapse: time since the last purgeExpired */ -void Queue::purgeExpired(qpid::sys::Duration lapse) -{ +void Queue::purgeExpired(sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. @@ -609,37 +642,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - std::deque<QueuedMessage> expired; - { - Mutex::ScopedLock locker(messageLock); - messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); - } - - if (!expired.empty()) { + std::deque<QueuedMessage> dequeued; + dequeueIf(boost::bind(&isExpired, _1), dequeued); + if (dequeued.size()) { if (mgmtObject) { - mgmtObject->inc_acquires(expired.size()); - mgmtObject->inc_discardsTtl(expired.size()); - if (brokerMgmtObject) { - brokerMgmtObject->inc_acquires(expired.size()); - brokerMgmtObject->inc_discardsTtl(expired.size()); - } - } - - for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); - i != expired.end(); ++i) { - { - // KAG: should be safe to retake lock after the removeIf, since - // no other thread can touch these messages after the removeIf() call - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); - } - dequeue( 0, *i ); + mgmtObject->inc_discardsTtl(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(dequeued.size()); } } } } - namespace { // for use with purge/move below - collect messages that match a given filter // @@ -797,6 +811,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> // now reroute if necessary if (dest.get()) { assert(qmsg->payload); + qmsg->payload->clearTrace(); DeliverableMessage dmsg(qmsg->payload); dest->routeWithAlternate(dmsg); } @@ -888,9 +903,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (mgmtObject) { mgmtObject->inc_acquires(); mgmtObject->inc_discardsLvq(); - if (brokerMgmtObject) + if (brokerMgmtObject) { brokerMgmtObject->inc_acquires(); brokerMgmtObject->inc_discardsLvq(); + } } if (isRecovery) { //can't issue new requests for the store until @@ -1470,12 +1486,18 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) +void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->destroyed(); + + if (broker.getManagementAgent()) + broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName())); + QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName() + << " user:" << userId + << " rhost:" << connectionId ); } } @@ -1483,9 +1505,11 @@ struct AutoDeleteTask : qpid::sys::TimerTask { Broker& broker; Queue::shared_ptr queue; + std::string connectionId; + std::string userId; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {} + AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {} void fire() { @@ -1493,19 +1517,19 @@ struct AutoDeleteTask : qpid::sys::TimerTask //created, but then became unused again before the task fired; //in this case ignore this request as there will have already //been a later task added - tryAutoDeleteImpl(broker, queue); + tryAutoDeleteImpl(broker, queue, connectionId, userId); } }; -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) { if (queue->autoDeleteTimeout && queue->canAutoDelete()) { AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); - queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); + queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time)); broker.getClusterTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); } else { - tryAutoDeleteImpl(broker, queue); + tryAutoDeleteImpl(broker, queue, connectionId, userId); } } @@ -1659,13 +1683,28 @@ void Queue::query(qpid::types::Variant::Map& results) const if (allocator) allocator->query(results); } +namespace { +struct After { + framing::SequenceNumber seq; + After(framing::SequenceNumber s) : seq(s) {} + bool operator()(const QueuedMessage& qm) { return qm.position > seq; } +}; +} // namespace + + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); + if (n < sequence) { + std::deque<QueuedMessage> dequeued; + dequeueIf(After(n), dequeued); + messages->setPosition(n); + } sequence = n; QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); } SequenceNumber Queue::getPosition() { + Mutex::ScopedLock locker(messageLock); return sequence; } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 9869a698c1..a31e0002ea 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -175,6 +175,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void configureImpl(const qpid::framing::FieldTable& settings); void checkNotDeleted(const Consumer::shared_ptr& c); void notifyDeleted(); + void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued); public: @@ -343,7 +344,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, * exclusive owner */ static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer); - static void tryAutoDelete(Broker& broker, Queue::shared_ptr); + static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId); virtual void setExternalQueueStore(ExternalQueueStore* inst); @@ -375,12 +376,21 @@ class Queue : public boost::enable_shared_from_this<Queue>, std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); } - /** Set the position sequence number for the next message on the queue. - * Must be >= the current sequence number. - * Used by cluster to replicate queues. + /** + * Set the sequence number for the back of the queue, the + * next message enqueued will be pos+1. + * If pos > getPosition() this creates a gap in the sequence numbers. + * if pos < getPosition() the back of the queue is reset to pos, + * + * The _caller_ must ensure that any messages after pos have been dequeued. + * + * Used by HA/cluster code for queue replication. */ QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos); - /** return current position sequence number for the next message on the queue. + + /** + *@return sequence number for the back of the queue. The next message pushed + * will be at getPosition+1 */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>); diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index f15bb45c01..14fe5f4022 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -75,8 +75,8 @@ namespace { result = v->get<int64_t>(); QPID_LOG(debug, "Got integer value for " << key << ": " << result); if (result >= 0) return result; - } else if (v->convertsTo<string>()) { - string s(v->get<string>()); + } else if (v->convertsTo<std::string>()) { + std::string s(v->get<std::string>()); QPID_LOG(debug, "Got string value for " << key << ": " << s); std::istringstream convert(s); if (convert >> result && result >= 0) return result; diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index d5b4c1ae86..3978420f4e 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -133,8 +133,8 @@ T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue result = v->get<T>(); QPID_LOG(debug, "Got integer value for " << key << ": " << result); if (result >= 0) return result; - } else if (v->convertsTo<string>()) { - string s(v->get<string>()); + } else if (v->convertsTo<std::string>()) { + std::string s(v->get<std::string>()); QPID_LOG(debug, "Got string value for " << key << ": " << s); std::istringstream convert(s); if (convert >> result && result >= 0 && convert.eof()) return result; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 236d5ae34c..1401356444 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueEvents.h" @@ -46,40 +47,49 @@ QueueRegistry::declare(const string& declareName, bool durable, definition from persistente record*/) { - RWlock::ScopedWlock locker(lock); - string name = declareName.empty() ? generateName() : declareName; - assert(!name.empty()); - QueueMap::iterator i = queues.find(name); + Queue::shared_ptr queue; + std::pair<Queue::shared_ptr, bool> result; + { + RWlock::ScopedWlock locker(lock); + string name = declareName.empty() ? generateName() : declareName; + assert(!name.empty()); + QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); - if (alternate) { - queue->setAlternateExchange(alternate);//need to do this *before* create - alternate->incAlternateUsers(); - } - if (!recovering) { - //apply settings & create persistent record if required - queue->create(arguments); + if (i == queues.end()) { + queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); + if (alternate) { + queue->setAlternateExchange(alternate);//need to do this *before* create + alternate->incAlternateUsers(); + } + if (!recovering) { + //apply settings & create persistent record if required + queue->create(arguments); + } else { + //i.e. recovering a queue for which we already have a persistent record + queue->configure(arguments); + } + queues[name] = queue; + if (lastNode) queue->setLastNodeFailure(); + result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { - //i.e. recovering a queue for which we already have a persistent record - queue->configure(arguments); + result = std::pair<Queue::shared_ptr, bool>(i->second, false); } - queues[name] = queue; - if (lastNode) queue->setLastNodeFailure(); - - return std::pair<Queue::shared_ptr, bool>(queue, true); - } else { - return std::pair<Queue::shared_ptr, bool>(i->second, false); } + if (broker && queue) broker->getConfigurationObservers().queueCreate(queue); + return result; } -void QueueRegistry::destroyLH (const string& name){ - queues.erase(name); -} - -void QueueRegistry::destroy (const string& name){ - RWlock::ScopedWlock locker(lock); - destroyLH (name); +void QueueRegistry::destroy(const string& name) { + Queue::shared_ptr q; + { + qpid::sys::RWlock::ScopedWlock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i != queues.end()) { + Queue::shared_ptr q = i->second; + queues.erase(i); + } + } + if (broker && q) broker->getConfigurationObservers().queueDestroy(q); } Queue::shared_ptr QueueRegistry::find(const string& name){ diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index f724e6b10c..a354513c5f 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -61,7 +61,7 @@ class QueueRegistry { QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare( const std::string& name, bool durable = false, - bool autodelete = false, + bool autodelete = false, const OwnershipToken* owner = 0, boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(), const qpid::framing::FieldTable& args = framing::FieldTable(), @@ -82,9 +82,8 @@ class QueueRegistry { QPID_BROKER_EXTERN void destroy(const std::string& name); template <class Test> bool destroyIf(const std::string& name, Test test) { - qpid::sys::RWlock::ScopedWlock locker(lock); if (test()) { - destroyLH (name); + destroy(name); return true; } else { return false; @@ -127,13 +126,13 @@ class QueueRegistry { for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i) f(i->second); } - + /** * Change queue mode when cluster size drops to 1 node, expands again * in practice allows flow queue to disk when last name to be exectuted */ void updateQueueClusterState(bool lastNode); - + private: typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap; QueueMap queues; @@ -144,12 +143,9 @@ private: management::Manageable* parent; bool lastNode; //used to set mode on queue declare Broker* broker; - - //destroy impl that assumes lock is already held: - void destroyLH (const std::string& name); }; - + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index d08409695e..858535637a 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) { string kind; - + uint32_t p = buffer.getPosition(); buffer.getShortString (kind); - if (kind == "link") + buffer.setPosition(p); + + if (Link::isEncodedLink(kind)) return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); - else if (kind == "bridge") + else if (Bridge::isEncodedBridge(kind)) return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index 80fa5e1c0e..2d7c820b63 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ # include "config.h" #endif +#include "qpid/broker/AclModule.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" @@ -37,6 +38,7 @@ using qpid::sys::cyrus::CyrusSecurityLayer; #endif +using std::string; using namespace qpid::framing; using qpid::sys::SecurityLayer; using qpid::sys::SecuritySettings; @@ -164,13 +166,17 @@ void SaslAuthenticator::fini(void) #endif -std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool isShadow ) +std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c ) { if (c.getBroker().getOptions().auth) { - if ( isShadow ) - return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); - else - return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); + // The cluster creates non-authenticated connections for internal shadow connections + // that are never connected to an external client. + if ( !c.isAuthenticated() ) + return std::auto_ptr<SaslAuthenticator>( + new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); + else + return std::auto_ptr<SaslAuthenticator>( + new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); } else { QPID_LOG(debug, "SASL: No Authentication Performed"); return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); @@ -178,7 +184,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti } -NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), +NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), realm(c.getBroker().getOptions().realm), encrypt(e) {} NullAuthenticator::~NullAuthenticator() {} @@ -214,7 +220,7 @@ void NullAuthenticator::start(const string& mechanism, const string* response) } else if (i != string::npos) { //authorization id is first null delimited field uid = response->substr(0, i); - }//else not a valid SASL PLAIN response, throw error? + }//else not a valid SASL PLAIN response, throw error? if (!uid.empty()) { //append realm if it has not already been added i = uid.find(realm); @@ -226,7 +232,12 @@ void NullAuthenticator::start(const string& mechanism, const string* response) } } else { connection.setUserId("anonymous"); - } + } + AclModule* acl = connection.getBroker().getAcl(); + if (acl && !acl->approveConnection(connection)) + { + throw ConnectionForcedException("User connection denied by configured limit"); + } client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } @@ -240,7 +251,7 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t) #if HAVE_SASL -CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : +CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt) { init(); @@ -271,17 +282,17 @@ void CyrusAuthenticator::init() NULL, /* Callbacks */ 0, /* Connection flags */ &sasl_conn); - + if (SASL_OK != code) { QPID_LOG(error, "SASL: Connection creation failed: [" << code << "] " << sasl_errdetail(sasl_conn)); - + // TODO: Change this to an exception signaling // server error, when one is available throw ConnectionForcedException("Unable to perform authentication"); } sasl_security_properties_t secprops; - + //TODO: should the actual SSF values be configurable here? secprops.min_ssf = encrypt ? 10: 0; secprops.max_ssf = 256; @@ -319,14 +330,14 @@ void CyrusAuthenticator::init() secprops.property_values = 0; secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */ /* - * The nodict flag restricts SASL authentication mechanisms - * to those that are not susceptible to dictionary attacks. - * They are: + * The nodict flag restricts SASL authentication mechanisms + * to those that are not susceptible to dictionary attacks. + * They are: * SRP * PASSDSS-3DES-1 * EXTERNAL */ - if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY; + if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY; int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops); if (result != SASL_OK) { throw framing::InternalErrorException(QPID_MSG("SASL error: " << result)); @@ -371,10 +382,10 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) "", separator, "", &list, &list_len, &count); - + if (SASL_OK != code) { QPID_LOG(info, "SASL: Mechanism listing failed: " << sasl_errdetail(sasl_conn)); - + // TODO: Change this to an exception signaling // server error, when one is available throw ConnectionForcedException("Mechanism listing failed"); @@ -382,17 +393,17 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) string mechanism; unsigned int start; unsigned int end; - + QPID_LOG(info, "SASL: Mechanism list: " << list); - + end = 0; do { start = end; - + // Seek to end of next mechanism while (end < list_len && separator[0] != list[end]) end++; - + // Record the mechanism mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value(string(list, start, end - start)))); end++; @@ -404,20 +415,20 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response) { const char *challenge; unsigned int challenge_len; - + // This should be at same debug level as mech list in getMechanisms(). QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism); int code = sasl_server_start(sasl_conn, mechanism.c_str(), (response ? response->c_str() : 0), (response ? response->size() : 0), &challenge, &challenge_len); - + processAuthenticationStep(code, challenge, challenge_len); qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); - if ( cnxMgmt ) + if ( cnxMgmt ) cnxMgmt->set_saslMechanism(mechanism); } - + void CyrusAuthenticator::step(const string& response) { const char *challenge; @@ -439,10 +450,17 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen // authentication failure, when one is available throw ConnectionForcedException("Authenticated username unavailable"); } - QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid); connection.setUserId(uid); + AclModule* acl = connection.getBroker().getAcl(); + if (acl && !acl->approveConnection(connection)) + { + throw ConnectionForcedException("User connection denied by configured limit"); + } + + QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid); + client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } else if (SASL_CONTINUE == code) { string challenge_str(challenge, challenge_len); @@ -490,7 +508,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); } qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); - if ( cnxMgmt ) + if ( cnxMgmt ) cnxMgmt->set_saslSsf(ssf); return securityLayer; } diff --git a/cpp/src/qpid/broker/SaslAuthenticator.h b/cpp/src/qpid/broker/SaslAuthenticator.h index 4e5d43214c..e5ecc9f6ec 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.h +++ b/cpp/src/qpid/broker/SaslAuthenticator.h @@ -54,7 +54,7 @@ public: static void init(const std::string& saslName, std::string const & saslConfigPath ); static void fini(void); - static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow); + static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection); virtual void callUserIdCallbacks() { } }; diff --git a/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 754b443c22..757f6efc59 100644 --- a/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,11 +41,6 @@ SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {} sys::ConnectionCodec* SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) { SecureConnectionPtr sc(new SecureConnection()); CodecPtr c(new amqp_0_10::Connection(out, id, false)); @@ -71,5 +66,5 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, return sc.release(); } - + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 64924bdd4c..9a84db547c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) dtxSelected(false), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), userID(getSession().getConnection().getUserId()), - closeComplete(false) + closeComplete(false), + connectionId(getSession().getConnection().getUrl()) {} SemanticState::~SemanticState() { @@ -142,6 +143,7 @@ bool SemanticState::cancel(const string& tag) DeliveryRecords::iterator removed = remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1)); unacked.erase(removed, unacked.end()); + getSession().setUnackedCount(unacked.size()); return true; } else { return false; @@ -270,6 +272,7 @@ void SemanticState::checkDtxTimeout() void SemanticState::record(const DeliveryRecord& delivery) { unacked.push_back(delivery); + getSession().setUnackedCount(unacked.size()); } const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); @@ -426,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - Queue::tryAutoDelete(session.getBroker(), queue); + Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID); } } c->cancel(); @@ -555,6 +558,7 @@ void SemanticState::recover(bool requeue) //w.r.t id is lost sort(unacked.begin(), unacked.end()); } + getSession().setUnackedCount(unacked.size()); } void SemanticState::deliver(DeliveryRecord& msg, bool sync) @@ -712,6 +716,7 @@ void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedeliver DeliveryRecords::iterator removed = remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1)); unacked.erase(removed, range.end); + getSession().setUnackedCount(unacked.size()); } void SemanticState::reject(DeliveryId first, DeliveryId last) @@ -723,6 +728,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) if (i->isRedundant()) i = unacked.erase(i); else i++; } + getSession().setUnackedCount(unacked.size()); } bool SemanticState::ConsumerImpl::doOutput() @@ -810,6 +816,7 @@ void SemanticState::accepted(const SequenceSet& commands) { (TransactionContext*) 0))); unacked.erase(removed, unacked.end()); } + getSession().setUnackedCount(unacked.size()); } void SemanticState::completed(const SequenceSet& commands) { @@ -819,6 +826,7 @@ void SemanticState::completed(const SequenceSet& commands) { bind(&SemanticState::complete, this, _1))); unacked.erase(removed, unacked.end()); requestDispatch(); + getSession().setUnackedCount(unacked.size()); } void SemanticState::attached() diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index e5e1d2da16..15928ce599 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -146,6 +146,8 @@ class SemanticState : private boost::noncopyable { std::string getResumeId() const { return resumeId; }; const std::string& getTag() const { return tag; } uint64_t getResumeTtl() const { return resumeTtl; } + uint32_t getDeliveryCount() const { return deliveryCount; } + void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } const framing::FieldTable& getArguments() const { return arguments; } SemanticState& getParent() { return *parent; } @@ -180,6 +182,8 @@ class SemanticState : private boost::noncopyable { const bool authMsg; const std::string userID; bool closeComplete; + //needed for queue delete events in auto-delete: + const std::string connectionId; void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void checkDtxTimeout(); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 78f2e43ce0..ae994a6bd5 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -41,6 +41,8 @@ namespace qpid { namespace broker { +using std::string; + using namespace qpid; using namespace qpid::framing; using namespace qpid::framing::dtx; @@ -107,6 +109,12 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const false, ManagementAgent::toMap(args), "existing")); + QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange + << " user:" << getConnection().getUserId() + << " rhost:" << getConnection().getUrl() + << " type:" << type + << " alternateExchange:" << alternateExchange + << " durable:" << (durable ? "T" : "F")); } }catch(UnknownExchangeTypeException& /*e*/){ throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type)); @@ -204,7 +212,10 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string } } -SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker()) +SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) + : HandlerHelper(session), broker(getBroker()), + //record connection id and userid for deleting exclsuive queues after session has ended: + connectionId(getConnection().getUrl()), userId(getConnection().getUserId()) {} @@ -223,7 +234,7 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); if (q->canAutoDelete()) { - Queue::tryAutoDelete(broker, q); + Queue::tryAutoDelete(broker, q, connectionId, userId); } exclusiveQueues.erase(exclusiveQueues.begin()); } @@ -307,6 +318,14 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments), "existing")); + QPID_LOG_CAT(debug, model, "Create queue. name:" << name + << " user:" << getConnection().getUserId() + << " rhost:" << getConnection().getUrl() + << " durable:" << (durable ? "T" : "F") + << " exclusive:" << (exclusive ? "T" : "F") + << " autodelete:" << (autoDelete ? "T" : "F") + << " alternateExchange:" << alternateExchange + ); } } @@ -411,6 +430,12 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, if (agent) agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), queueName, destination, exclusive, ManagementAgent::toMap(arguments))); + QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName + << " destination:" << destination + << " user:" << getConnection().getUserId() + << " rhost:" << getConnection().getUrl() + << " exclusive:" << (exclusive ? "T" : "F") + ); } void @@ -423,6 +448,9 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination)); + QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination + << " user:" << getConnection().getUserId() + << " rhost:" << getConnection().getUrl() ); } void diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index bc056538b1..3cc745f96c 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -121,6 +121,9 @@ class Queue; { Broker& broker; std::vector< boost::shared_ptr<Queue> > exclusiveQueues; + //connectionId and userId are needed for queue-delete events for auto deleted, exclusive queues + std::string connectionId; + std::string userId; public: QueueHandlerImpl(SemanticState& session); diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h index 253ce8dcf2..ee98da1878 100644 --- a/cpp/src/qpid/broker/SessionContext.h +++ b/cpp/src/qpid/broker/SessionContext.h @@ -47,6 +47,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual uint16_t getChannel() const = 0; virtual const SessionId& getSessionId() const = 0; virtual void addPendingExecutionSync() = 0; + virtual void setUnackedCount(uint64_t) {} }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index b58c7c01c5..23fa2ee0ca 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -35,23 +35,39 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) : amqp_0_10::SessionHandler(&c.getOutput(), ch), connection(c), proxy(out), - clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) + clusterOrderProxy(c.getClusterOrderOutput() ? + new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) {} SessionHandler::~SessionHandler() {} -void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) { +void SessionHandler::connectionException( + framing::connection::CloseCode code, const std::string& msg) +{ // NOTE: must tell the error listener _before_ calling connection.close() - if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg); + if (connection.getErrorListener()) + connection.getErrorListener()->connectionError(msg); + if (errorListener) + errorListener->connectionException(code, msg); connection.close(code, msg); } -void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) { - if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +void SessionHandler::channelException( + framing::session::DetachCode code, const std::string& msg) +{ + if (connection.getErrorListener()) + connection.getErrorListener()->sessionError(getChannel(), msg); + if (errorListener) + errorListener->channelException(code, msg); } -void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) { - if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +void SessionHandler::executionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (connection.getErrorListener()) + connection.getErrorListener()->sessionError(getChannel(), msg); + if (errorListener) + errorListener->executionException(code, msg); } ConnectionState& SessionHandler::getConnection() { return connection; } @@ -64,7 +80,7 @@ void SessionHandler::handleDetach() { if (session.get()) connection.getBroker().getSessionManager().detach(session); assert(!session.get()); - if (detachedCallback) detachedCallback(); + if (errorListener) errorListener->detach(); connection.closeChannel(channel.get()); } @@ -118,8 +134,4 @@ void SessionHandler::attached(const std::string& name) } } -void SessionHandler::setDetachedCallback(boost::function<void()> cb) { - detachedCallback = cb; -} - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 4e2cfaa963..ab87cf41a4 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -25,7 +25,7 @@ #include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { class SessionState; @@ -43,6 +43,21 @@ class SessionState; */ class SessionHandler : public amqp_0_10::SessionHandler { public: + class ErrorListener { + public: + virtual ~ErrorListener() {} + virtual void connectionException( + framing::connection::CloseCode code, const std::string& msg) = 0; + virtual void channelException( + framing::session::DetachCode, const std::string& msg) = 0; + virtual void executionException( + framing::execution::ErrorCode, const std::string& msg) = 0; + /** Called when it is safe to delete the ErrorListener. */ + virtual void detach() = 0; + }; + + /** + *@param e must not be deleted until ErrorListener::detach has been called */ SessionHandler(Connection&, framing::ChannelId); ~SessionHandler(); @@ -71,7 +86,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { void attached(const std::string& name);//used by 'pushing' inter-broker bridges void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges - void setDetachedCallback(boost::function<void()> cb); + void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; } protected: virtual void setState(const std::string& sessionName, bool force); @@ -94,7 +109,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { framing::AMQP_ClientProxy proxy; std::auto_ptr<SessionState> session; std::auto_ptr<SetChannelProxy> clusterOrderProxy; - boost::function<void ()> detachedCallback; + boost::shared_ptr<ErrorListener> errorListener; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 99407bc3a6..cc02d9ec94 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -156,7 +156,7 @@ ManagementObject* SessionState::GetManagementObject (void) const Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, Args& /*args*/, - string& /*text*/) + std::string& /*text*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 8db232a2d6..a8ff7feff9 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -126,6 +126,11 @@ class SessionState : public qpid::SessionState, // the SessionState of a received Execution.Sync command. void addPendingExecutionSync(); + void setUnackedCount(uint64_t count) { + if (mgmtObject) + mgmtObject->set_unackedMessages(count); + } + // Used to delay creation of management object for sessions // belonging to inter-broker bridges void addManagementObject(); diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index 8cd2edda76..fa8df6406b 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -37,7 +37,6 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) if (agent != 0) { - framing::Uuid systemId; if (_dataDir.empty ()) { @@ -66,14 +65,13 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) } mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); - std::string sysname, nodename, release, version, machine; - qpid::sys::SystemInfo::getSystemId (sysname, - nodename, + qpid::sys::SystemInfo::getSystemId (osName, + nodeName, release, version, machine); - mgmtObject->set_osName (sysname); - mgmtObject->set_nodeName (nodename); + mgmtObject->set_osName (osName); + mgmtObject->set_nodeName (nodeName); mgmtObject->set_release (release); mgmtObject->set_version (version); mgmtObject->set_machine (machine); diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h index 0fc2c2bd88..6847c662ae 100644 --- a/cpp/src/qpid/broker/System.h +++ b/cpp/src/qpid/broker/System.h @@ -21,6 +21,7 @@ // #include "qpid/management/Manageable.h" +#include "qpid/framing/Uuid.h" #include "qmf/org/apache/qpid/broker/System.h" #include <boost/shared_ptr.hpp> #include <string> @@ -35,6 +36,8 @@ class System : public management::Manageable private: qmf::org::apache::qpid::broker::System* mgmtObject; + framing::Uuid systemId; + std::string osName, nodeName, release, version, machine; public: @@ -44,6 +47,20 @@ class System : public management::Manageable management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } + + + /** Persistent UUID assigned by the management system to this broker. */ + framing::Uuid getSystemId() const { return systemId; } + /** Returns the OS name; e.g., GNU/Linux or Windows */ + std::string getOsName() const { return osName; } + /** Returns the node name. Usually the same as the host name. */ + std::string getNodeName() const { return nodeName; } + /** Returns the OS release identifier. */ + std::string getRelease() const { return release; } + /** Returns the OS release version (kernel, build, sp, etc.) */ + std::string getVersion() const { return version; } + /** Returns the hardware type. */ + std::string getMachine() const { return machine; } }; }} diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index dd3ec13019..c11389bb17 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -32,20 +32,8 @@ using namespace qpid::sys; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// Areas for improvement: -// - excessive string copying: should be 0 copy, match from original buffer. -// - match/lookup: use descision tree or other more efficient structure. - -namespace -{ -const std::string STAR("*"); -const std::string HASH("#"); -} - // iterator for federation ReOrigin bind operation -class TopicExchange::ReOriginIter : public TopicExchange::BindingNode::TreeIterator { +class TopicExchange::ReOriginIter : public BindingNode::TreeIterator { public: ReOriginIter() {}; ~ReOriginIter() {}; @@ -61,7 +49,7 @@ public: // match iterator used by route(): builds BindingList of all unique queues // that match the routing key. -class TopicExchange::BindingsFinderIter : public TopicExchange::BindingNode::TreeIterator { +class TopicExchange::BindingsFinderIter : public BindingNode::TreeIterator { public: BindingsFinderIter(BindingList &bl) : b(bl) {}; ~BindingsFinderIter() {}; @@ -85,7 +73,7 @@ public: // Iterator to visit all bindings until a given queue is found -class TopicExchange::QueueFinderIter : public TopicExchange::BindingNode::TreeIterator { +class TopicExchange::QueueFinderIter : public BindingNode::TreeIterator { public: QueueFinderIter(Queue::shared_ptr queue) : queue(queue), found(false) {}; ~QueueFinderIter() {}; @@ -107,58 +95,7 @@ public: }; -// Iterate over a string of '.'-separated tokens. -struct TopicExchange::TokenIterator { - typedef pair<const char*,const char*> Token; - - TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {} - - TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {} - - bool finished() const { return !token.first; } - - void next() { - if (token.second == end) - token.first = token.second = 0; - else { - token.first=token.second+1; - token.second=(find(token.first, end, '.')); - } - } - - void pop(string &top) { - ptrdiff_t l = len(); - if (l) { - top.assign(token.first, l); - } else top.clear(); - next(); - } - - bool match1(char c) const { - return token.second==token.first+1 && *token.first == c; - } - - bool match(const Token& token2) const { - ptrdiff_t l=len(); - return l == token2.second-token2.first && - strncmp(token.first, token2.first, l) == 0; - } - - bool match(const string& str) const { - ptrdiff_t l=len(); - return l == ptrdiff_t(str.size()) && - str.compare(0, l, token.first, l) == 0; - } - - ptrdiff_t len() const { return token.second - token.first; } - - - const char* end; - Token token; -}; - - -class TopicExchange::Normalizer : public TopicExchange::TokenIterator { +class TopicExchange::Normalizer : public TokenIterator { public: Normalizer(string& p) : TokenIterator(&p[0], &p[0]+p.size()), pattern(p) @@ -230,7 +167,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { RWlock::ScopedWlock l(lock); - BindingKey *bk = bindingTree.addBindingKey(routingPattern); + BindingKey *bk = bindingTree.add(routingPattern); if (bk) { Binding::vector& qv(bk->bindingVector); Binding::vector::iterator q; @@ -324,7 +261,7 @@ bool TopicExchange::deleteBinding(Queue::shared_ptr queue, nBindings--; if(qv.empty()) { - bindingTree.removeBindingKey(routingKey); + bindingTree.remove(routingKey); } if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); @@ -340,7 +277,7 @@ bool TopicExchange::deleteBinding(Queue::shared_ptr queue, TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern) { // Note well: lock held by caller.... - BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern + BindingKey *bk = bindingTree.get(pattern); // Exact match against binding pattern if (!bk) return 0; Binding::vector& qv(bk->bindingVector); Binding::vector::iterator q; @@ -385,7 +322,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing } else if (!routingKey && !queue) { return nBindings > 0; } else if (routingKey) { - if (bindingTree.getBindingKey(*routingKey)) { + if (bindingTree.get(*routingKey)) { return true; } } else { @@ -400,294 +337,4 @@ TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); -// -// class BindingNode -// - -TopicExchange::BindingNode::~BindingNode() -{ - childTokens.clear(); -} - - -// Add a binding pattern to the tree. Return a pointer to the binding key -// of the node that matches the binding pattern. -TopicExchange::BindingKey* -TopicExchange::BindingNode::addBindingKey(const std::string& normalizedRoute) -{ - TokenIterator bKey(normalizedRoute); - return addBindingKey(bKey, normalizedRoute); -} - - -// Return a pointer to the binding key of the leaf node that matches the binding pattern. -TopicExchange::BindingKey* -TopicExchange::BindingNode::getBindingKey(const std::string& normalizedRoute) -{ - TokenIterator bKey(normalizedRoute); - return getBindingKey(bKey); -} - - -// Delete the binding associated with the given route. -void TopicExchange::BindingNode::removeBindingKey(const std::string& normalizedRoute) -{ - TokenIterator bKey2(normalizedRoute); - removeBindingKey(bKey2, normalizedRoute); -} - -// visit each node in the tree. Note: all nodes are visited, -// even non-leaf nodes (i.e. nodes without any bindings) -bool TopicExchange::BindingNode::iterateAll(TopicExchange::BindingNode::TreeIterator& iter) -{ - if (!iter.visit(*this)) return false; - - if (starChild && !starChild->iterateAll(iter)) return false; - - if (hashChild && !hashChild->iterateAll(iter)) return false; - - for (ChildMap::iterator ptr = childTokens.begin(); - ptr != childTokens.end(); ptr++) { - - if (!ptr->second->iterateAll(iter)) return false; - } - - return true; -} - -// applies iter against only matching nodes until iter returns false -// Note Well: the iter may match against the same node more than once -// if # wildcards are present! -bool TopicExchange::BindingNode::iterateMatch(const std::string& routingKey, TreeIterator& iter) -{ - TopicExchange::TokenIterator rKey(routingKey); - return iterateMatch( rKey, iter ); -} - - -// recurse over binding using token iterator. -// Note well: bkey is modified! -TopicExchange::BindingKey* -TopicExchange::BindingNode::addBindingKey(TokenIterator &bKey, - const string& fullPattern) -{ - if (bKey.finished()) { - // this node's binding - if (routePattern.empty()) { - routePattern = fullPattern; - } else assert(routePattern == fullPattern); - - return &bindings; - - } else { - // pop the topmost token & recurse... - - if (bKey.match(STAR)) { - if (!starChild) { - starChild.reset(new StarNode()); - } - bKey.next(); - return starChild->addBindingKey(bKey, fullPattern); - - } else if (bKey.match(HASH)) { - if (!hashChild) { - hashChild.reset(new HashNode()); - } - bKey.next(); - return hashChild->addBindingKey(bKey, fullPattern); - - } else { - ChildMap::iterator ptr; - std::string next_token; - bKey.pop(next_token); - ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - return ptr->second->addBindingKey(bKey, fullPattern); - } else { - BindingNode::shared_ptr child(new BindingNode(next_token)); - childTokens[next_token] = child; - return child->addBindingKey(bKey, fullPattern); - } - } - } -} - - -// Remove a binding pattern from the tree. Return true if the current -// node becomes a leaf without any bindings (therefore can be deleted). -// Note Well: modifies parameter bKey's value! -bool -TopicExchange::BindingNode::removeBindingKey(TokenIterator &bKey, - const string& fullPattern) -{ - bool remove; - - if (!bKey.finished()) { - - if (bKey.match(STAR)) { - bKey.next(); - if (starChild) { - remove = starChild->removeBindingKey(bKey, fullPattern); - if (remove) { - starChild.reset(); - } - } - } else if (bKey.match(HASH)) { - bKey.next(); - if (hashChild) { - remove = hashChild->removeBindingKey(bKey, fullPattern); - if (remove) { - hashChild.reset(); - } - } - } else { - ChildMap::iterator ptr; - std::string next_token; - bKey.pop(next_token); - ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - remove = ptr->second->removeBindingKey(bKey, fullPattern); - if (remove) { - childTokens.erase(ptr); - } - } - } - } - - // no bindings and no children == parent can delete this node. - return getChildCount() == 0 && bindings.bindingVector.empty(); -} - - -// find the binding key that matches the given binding pattern. -// Note Well: modifies key parameter! -TopicExchange::BindingKey* -TopicExchange::BindingNode::getBindingKey(TokenIterator &key) -{ - if (key.finished()) { - return &bindings; - } - - string next_token; - - key.pop(next_token); - - if (next_token == STAR) { - if (starChild) - return starChild->getBindingKey(key); - } else if (next_token == HASH) { - if (hashChild) - return hashChild->getBindingKey(key); - } else { - ChildMap::iterator ptr; - ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - return ptr->second->getBindingKey(key); - } - } - - return 0; -} - - - -// iterate over all nodes that match the given key. Note well: the set of nodes -// that are visited includes matching non-leaf nodes. -// Note well: parameter key is modified! -bool TopicExchange::BindingNode::iterateMatch(TokenIterator& key, TreeIterator& iter) -{ - // invariant: key has matched all previous tokens up to this node. - if (key.finished()) { - // exact match this node: visit if bound - if (!bindings.bindingVector.empty()) - if (!iter.visit(*this)) return false; - } - - // check remaining key against children, even if empty. - return iterateMatchChildren(key, iter); -} - - -TopicExchange::StarNode::StarNode() - : BindingNode(STAR) {} - - -// See iterateMatch() above. -// Special case: this node must verify a token is available (match exactly one). -bool TopicExchange::StarNode::iterateMatch(TokenIterator& key, TreeIterator& iter) -{ - // must match one token: - if (key.finished()) - return true; // match failed, but continue iteration on siblings - - // pop the topmost token - key.next(); - - if (key.finished()) { - // exact match this node: visit if bound - if (!bindings.bindingVector.empty()) - if (!iter.visit(*this)) return false; - } - - return iterateMatchChildren(key, iter); -} - - -TopicExchange::HashNode::HashNode() - : BindingNode(HASH) {} - - -// See iterateMatch() above. -// Special case: can match zero or more tokens at the head of the key. -bool TopicExchange::HashNode::iterateMatch(TokenIterator& key, TreeIterator& iter) -{ - // consume each token and look for a match on the - // remaining key. - while (!key.finished()) { - if (!iterateMatchChildren(key, iter)) return false; - key.next(); - } - - if (!bindings.bindingVector.empty()) - return iter.visit(*this); - - return true; -} - - -// helper: iterate over current node's matching children -bool -TopicExchange::BindingNode::iterateMatchChildren(const TopicExchange::TokenIterator& key, - TopicExchange::BindingNode::TreeIterator& iter) -{ - // always try glob - it can match empty keys - if (hashChild) { - TokenIterator tmp(key); - if (!hashChild->iterateMatch(tmp, iter)) - return false; - } - - if (!key.finished()) { - - if (starChild) { - TokenIterator tmp(key); - if (!starChild->iterateMatch(tmp, iter)) - return false; - } - - if (!childTokens.empty()) { - TokenIterator newKey(key); - std::string next_token; - newKey.pop(next_token); - - ChildMap::iterator ptr = childTokens.find(next_token); - if (ptr != childTokens.end()) { - return ptr->second->iterateMatch(newKey, iter); - } - } - } - - return true; -} - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index cc24e1411e..46871a1c6b 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -28,6 +28,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/TopicKeyNode.h" namespace qpid { @@ -35,7 +36,6 @@ namespace broker { class TopicExchange : public virtual Exchange { - struct TokenIterator; class Normalizer; struct BindingKey { // binding for this node @@ -43,129 +43,44 @@ class TopicExchange : public virtual Exchange { FedBinding fedBinding; }; - // Binding database: - // The dotted form of a binding key is broken up and stored in a directed tree graph. - // Common binding prefix are merged. This allows the route match alogrithm to quickly - // isolate those sub-trees that match a given routingKey. - // For example, given the routes: - // a.b.c.<...> - // a.b.d.<...> - // a.x.y.<...> - // The resulting tree would be: - // a-->b-->c-->... - // | +-->d-->... - // +-->x-->y-->... - // - class QPID_BROKER_CLASS_EXTERN BindingNode { - public: - - typedef boost::shared_ptr<BindingNode> shared_ptr; - - // for database transversal (visit a node). - class TreeIterator { - public: - TreeIterator() {}; - virtual ~TreeIterator() {}; - virtual bool visit(BindingNode& node) = 0; - }; - - BindingNode() {}; - BindingNode(const std::string& token) : token(token) {}; - QPID_BROKER_EXTERN virtual ~BindingNode(); - - // add normalizedRoute to tree, return associated BindingKey - QPID_BROKER_EXTERN BindingKey* addBindingKey(const std::string& normalizedRoute); - - // return BindingKey associated with normalizedRoute - QPID_BROKER_EXTERN BindingKey* getBindingKey(const std::string& normalizedRoute); - - // remove BindingKey associated with normalizedRoute - QPID_BROKER_EXTERN void removeBindingKey(const std::string& normalizedRoute); - - // applies iter against each node in tree until iter returns false - QPID_BROKER_EXTERN bool iterateAll(TreeIterator& iter); - - // applies iter against only matching nodes until iter returns false - QPID_BROKER_EXTERN bool iterateMatch(const std::string& routingKey, TreeIterator& iter); - - std::string routePattern; // normalized binding that matches this node - BindingKey bindings; // for matches against this node - - protected: - - std::string token; // portion of pattern represented by this node - - // children - typedef std::map<const std::string, BindingNode::shared_ptr> ChildMap; - ChildMap childTokens; - BindingNode::shared_ptr starChild; // "*" subtree - BindingNode::shared_ptr hashChild; // "#" subtree - - unsigned int getChildCount() { return childTokens.size() + - (starChild ? 1 : 0) + (hashChild ? 1 : 0); } - BindingKey* addBindingKey(TokenIterator& bKey, - const std::string& fullPattern); - bool removeBindingKey(TokenIterator& bKey, - const std::string& fullPattern); - BindingKey* getBindingKey(TokenIterator& bKey); - QPID_BROKER_EXTERN virtual bool iterateMatch(TokenIterator& rKey, TreeIterator& iter); - bool iterateMatchChildren(const TokenIterator& key, TreeIterator& iter); - }; - - // Special case: ("*" token) Node in the tree for a match exactly one wildcard - class StarNode : public BindingNode { - public: - StarNode(); - ~StarNode() {}; - - protected: - virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter); - }; + typedef TopicKeyNode<BindingKey> BindingNode; - // Special case: ("#" token) Node in the tree for a match zero or more - class HashNode : public BindingNode { - public: - HashNode(); - ~HashNode() {}; + BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern); + bool deleteBinding(Queue::shared_ptr queue, + const std::string& routingKey, + BindingKey *bk); - protected: - virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter); - }; + class ReOriginIter; + class BindingsFinderIter; + class QueueFinderIter; BindingNode bindingTree; unsigned long nBindings; qpid::sys::RWlock lock; // protects bindingTree and nBindings qpid::sys::RWlock cacheLock; // protects cache std::map<std::string, BindingList> bindingCache; // cache of matched routes. + class ClearCache { private: qpid::sys::RWlock* cacheLock; std::map<std::string, BindingList>* bindingCache; - bool cleared; + bool cleared; public: - ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l), - bindingCache(bc),cleared(false) {}; + ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc) : + cacheLock(l), bindingCache(bc),cleared(false) {}; void clearCache() { - qpid::sys::RWlock::ScopedWlock l(*cacheLock); - if (!cleared) { - bindingCache->clear(); - cleared =true; - } + qpid::sys::RWlock::ScopedWlock l(*cacheLock); + if (!cleared) { + bindingCache->clear(); + cleared =true; + } }; ~ClearCache(){ - clearCache(); + clearCache(); }; }; - BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern); - bool deleteBinding(Queue::shared_ptr queue, - const std::string& routingKey, - BindingKey *bk); - class ReOriginIter; - class BindingsFinderIter; - class QueueFinderIter; - - public: +public: static const std::string typeName; static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern); @@ -199,7 +114,6 @@ class TopicExchange : public virtual Exchange { }; - } } diff --git a/cpp/src/qpid/broker/TopicKeyNode.h b/cpp/src/qpid/broker/TopicKeyNode.h new file mode 100644 index 0000000000..7671ed069d --- /dev/null +++ b/cpp/src/qpid/broker/TopicKeyNode.h @@ -0,0 +1,371 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QPID_BROKER_TOPIC_KEY_NODE_ +#define _QPID_BROKER_TOPIC_KEY_NODE_ + +#include "qpid/broker/BrokerImportExport.h" +#include <boost/shared_ptr.hpp> +#include <map> +#include <string> +#include <string.h> + + +namespace qpid { +namespace broker { + +static const std::string STAR("*"); +static const std::string HASH("#"); + + +// Iterate over a string of '.'-separated tokens. +struct TokenIterator { + typedef std::pair<const char*,const char*> Token; + + TokenIterator(const char* b, const char* e) : end(e), token(std::make_pair(b, std::find(b,e,'.'))) {} + + TokenIterator(const std::string& key) : end(&key[0]+key.size()), token(std::make_pair(&key[0], std::find(&key[0],end,'.'))) {} + + bool finished() const { return !token.first; } + + void next() { + if (token.second == end) + token.first = token.second = 0; + else { + token.first=token.second+1; + token.second=(std::find(token.first, end, '.')); + } + } + + void pop(std::string &top) { + ptrdiff_t l = len(); + if (l) { + top.assign(token.first, l); + } else top.clear(); + next(); + } + + bool match1(char c) const { + return token.second==token.first+1 && *token.first == c; + } + + bool match(const Token& token2) const { + ptrdiff_t l=len(); + return l == token2.second-token2.first && + strncmp(token.first, token2.first, l) == 0; + } + + bool match(const std::string& str) const { + ptrdiff_t l=len(); + return l == ptrdiff_t(str.size()) && + str.compare(0, l, token.first, l) == 0; + } + + ptrdiff_t len() const { return token.second - token.first; } + + + const char* end; + Token token; +}; + + +// Binding database: +// The dotted form of a binding key is broken up and stored in a directed tree graph. +// Common binding prefix are merged. This allows the route match alogrithm to quickly +// isolate those sub-trees that match a given routingKey. +// For example, given the routes: +// a.b.c.<...> +// a.b.d.<...> +// a.x.y.<...> +// The resulting tree would be: +// a-->b-->c-->... +// | +-->d-->... +// +-->x-->y-->... +// +template <class T> +class QPID_BROKER_CLASS_EXTERN TopicKeyNode { + + public: + + typedef boost::shared_ptr<TopicKeyNode> shared_ptr; + + // for database transversal (visit a node). + class TreeIterator { + public: + TreeIterator() {}; + virtual ~TreeIterator() {}; + virtual bool visit(TopicKeyNode& node) = 0; + }; + + TopicKeyNode() : isStar(false), isHash(false) {} + TopicKeyNode(const std::string& _t) : token(_t), isStar(_t == STAR), isHash(_t == HASH) {} + QPID_BROKER_EXTERN virtual ~TopicKeyNode() { + childTokens.clear(); + } + + // add normalizedRoute to tree, return associated T + QPID_BROKER_EXTERN T* add(const std::string& normalizedRoute) { + TokenIterator bKey(normalizedRoute); + return add(bKey, normalizedRoute); + } + + // return T associated with normalizedRoute + QPID_BROKER_EXTERN T* get(const std::string& normalizedRoute) { + TokenIterator bKey(normalizedRoute); + return get(bKey); + } + + // remove T associated with normalizedRoute + QPID_BROKER_EXTERN void remove(const std::string& normalizedRoute) { + TokenIterator bKey2(normalizedRoute); + remove(bKey2, normalizedRoute); + } + + // applies iter against each node in tree until iter returns false + QPID_BROKER_EXTERN bool iterateAll(TreeIterator& iter) { + if (!iter.visit(*this)) return false; + if (starChild && !starChild->iterateAll(iter)) return false; + if (hashChild && !hashChild->iterateAll(iter)) return false; + for (typename ChildMap::iterator ptr = childTokens.begin(); + ptr != childTokens.end(); ptr++) { + if (!ptr->second->iterateAll(iter)) return false; + } + return true; + } + + // applies iter against only matching nodes until iter returns false + QPID_BROKER_EXTERN bool iterateMatch(const std::string& routingKey, TreeIterator& iter) { + TokenIterator rKey(routingKey); + return iterateMatch( rKey, iter ); + } + + std::string routePattern; // normalized binding that matches this node + T bindings; // for matches against this node + + private: + + std::string token; // portion of pattern represented by this node + bool isStar; + bool isHash; + + // children + typedef std::map<const std::string, typename TopicKeyNode::shared_ptr> ChildMap; + ChildMap childTokens; + typename TopicKeyNode::shared_ptr starChild; // "*" subtree + typename TopicKeyNode::shared_ptr hashChild; // "#" subtree + + unsigned int getChildCount() { return childTokens.size() + + (starChild ? 1 : 0) + (hashChild ? 1 : 0); } + + T* add(TokenIterator& bKey, const std::string& fullPattern){ + if (bKey.finished()) { + // this node's binding + if (routePattern.empty()) { + routePattern = fullPattern; + } else assert(routePattern == fullPattern); + + return &bindings; + + } else { + // pop the topmost token & recurse... + + if (bKey.match(STAR)) { + if (!starChild) { + starChild.reset(new TopicKeyNode<T>(STAR)); + } + bKey.next(); + return starChild->add(bKey, fullPattern); + + } else if (bKey.match(HASH)) { + if (!hashChild) { + hashChild.reset(new TopicKeyNode<T>(HASH)); + } + bKey.next(); + return hashChild->add(bKey, fullPattern); + + } else { + typename ChildMap::iterator ptr; + std::string next_token; + bKey.pop(next_token); + ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + return ptr->second->add(bKey, fullPattern); + } else { + typename TopicKeyNode::shared_ptr child(new TopicKeyNode<T>(next_token)); + childTokens[next_token] = child; + return child->add(bKey, fullPattern); + } + } + } + } + + + bool remove(TokenIterator& bKey, const std::string& fullPattern) { + bool remove; + if (!bKey.finished()) { + if (bKey.match(STAR)) { + bKey.next(); + if (starChild) { + remove = starChild->remove(bKey, fullPattern); + if (remove) { + starChild.reset(); + } + } + } else if (bKey.match(HASH)) { + bKey.next(); + if (hashChild) { + remove = hashChild->remove(bKey, fullPattern); + if (remove) { + hashChild.reset(); + } + } + } else { + typename ChildMap::iterator ptr; + std::string next_token; + bKey.pop(next_token); + ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + remove = ptr->second->remove(bKey, fullPattern); + if (remove) { + childTokens.erase(ptr); + } + } + } + } + + // no bindings and no children == parent can delete this node. + return getChildCount() == 0 && bindings.bindingVector.empty(); + } + + + T* get(TokenIterator& bKey) { + if (bKey.finished()) { + return &bindings; + } + + std::string next_token; + bKey.pop(next_token); + + if (next_token == STAR) { + if (starChild) + return starChild->get(bKey); + } else if (next_token == HASH) { + if (hashChild) + return hashChild->get(bKey); + } else { + typename ChildMap::iterator ptr; + ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + return ptr->second->get(bKey); + } + } + + return 0; + } + + + bool iterateMatch(TokenIterator& rKey, TreeIterator& iter) { + if (isStar) return iterateMatchStar(rKey, iter); + if (isHash) return iterateMatchHash(rKey, iter); + return iterateMatchString(rKey, iter); + } + + + bool iterateMatchString(TokenIterator& rKey, TreeIterator& iter){ + // invariant: key has matched all previous tokens up to this node. + if (rKey.finished()) { + // exact match this node: visit if bound + if (!bindings.bindingVector.empty()) + if (!iter.visit(*this)) return false; + } + + // check remaining key against children, even if empty. + return iterateMatchChildren(rKey, iter); + } + + + bool iterateMatchStar(TokenIterator& rKey, TreeIterator& iter) { + // must match one token: + if (rKey.finished()) + return true; // match failed, but continue iteration on siblings + + // pop the topmost token + rKey.next(); + + if (rKey.finished()) { + // exact match this node: visit if bound + if (!bindings.bindingVector.empty()) + if (!iter.visit(*this)) return false; + } + + return iterateMatchChildren(rKey, iter); + } + + + bool iterateMatchHash(TokenIterator& rKey, TreeIterator& iter) { + // consume each token and look for a match on the + // remaining key. + while (!rKey.finished()) { + if (!iterateMatchChildren(rKey, iter)) return false; + rKey.next(); + } + + if (!bindings.bindingVector.empty()) + return iter.visit(*this); + + return true; + } + + + bool iterateMatchChildren(const TokenIterator& key, TreeIterator& iter) { + // always try glob - it can match empty keys + if (hashChild) { + TokenIterator tmp(key); + if (!hashChild->iterateMatch(tmp, iter)) + return false; + } + + if (!key.finished()) { + if (starChild) { + TokenIterator tmp(key); + if (!starChild->iterateMatch(tmp, iter)) + return false; + } + + if (!childTokens.empty()) { + TokenIterator newKey(key); + std::string next_token; + newKey.pop(next_token); + + typename ChildMap::iterator ptr = childTokens.find(next_token); + if (ptr != childTokens.end()) { + return ptr->second->iterateMatch(newKey, iter); + } + } + } + + return true; + } +}; + +} +} + +#endif diff --git a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp index a38e6ac12a..40e74be018 100644 --- a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp @@ -32,6 +32,8 @@ using namespace qpid::framing; using qpid::sys::SecurityLayer; +using std::string; + namespace qpid { namespace broker { @@ -79,7 +81,7 @@ void SaslAuthenticator::fini(void) return; } -std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool) +std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c) { if (c.getBroker().getOptions().auth) { return std::auto_ptr<SaslAuthenticator>(new SspiAuthenticator(c)); diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index 1dff1ddc8f..fb59d058f8 100644 --- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -44,26 +44,34 @@ namespace qpid { namespace sys { + +class Timer; + namespace windows { struct SslServerOptions : qpid::Options { std::string certStore; + std::string certStoreLocation; std::string certName; uint16_t port; bool clientAuth; SslServerOptions() : qpid::Options("SSL Options"), - certStore("My"), port(5671), clientAuth(false) + certStore("My"), + certStoreLocation("CurrentUser"), + certName("localhost"), + port(5671), + clientAuth(false) { qpid::Address me; if (qpid::sys::SystemInfo::getLocalHostname(me)) certName = me.host; - else - certName = "localhost"; addOptions() ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate") + ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"), + "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )") ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use") ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections") ("ssl-require-client-authentication", optValue(clientAuth), @@ -72,10 +80,12 @@ struct SslServerOptions : qpid::Options }; class SslProtocolFactory : public qpid::sys::ProtocolFactory { - const bool tcpNoDelay; boost::ptr_vector<Socket> listeners; boost::ptr_vector<AsynchAcceptor> acceptors; + Timer& brokerTimer; + uint32_t maxNegotiateTime; uint16_t listeningPort; + const bool tcpNoDelay; std::string brokerHost; const bool clientAuthSelected; std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor; @@ -83,7 +93,9 @@ class SslProtocolFactory : public qpid::sys::ProtocolFactory { CredHandle credHandle; public: - SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, int backlog, bool nodelay); + SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, + int backlog, bool nodelay, + Timer& timer, uint32_t maxTime); ~SslProtocolFactory(); void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*); void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port, @@ -120,8 +132,8 @@ static struct SslPlugin : public Plugin { const broker::Broker::Options& opts = broker->getOptions(); ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options, "", boost::lexical_cast<std::string>(options.port), - opts.connectionBacklog, - opts.tcpNoDelay)); + opts.connectionBacklog, opts.tcpNoDelay, + broker->getTimer(), opts.maxNegotiateTime)); QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort()); broker->registerProtocolFactory("ssl", protocol); } catch (const std::exception& e) { @@ -132,9 +144,12 @@ static struct SslPlugin : public Plugin { } sslPlugin; SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, - const std::string& host, const std::string& port, int backlog, - bool nodelay) - : tcpNoDelay(nodelay), + const std::string& host, const std::string& port, + int backlog, bool nodelay, + Timer& timer, uint32_t maxTime) + : brokerTimer(timer), + maxNegotiateTime(maxTime), + tcpNoDelay(nodelay), clientAuthSelected(options.clientAuth) { // Make sure that certificate store is good before listening to sockets @@ -142,11 +157,25 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, SecInvalidateHandle(&credHandle); // Get the certificate for this server. + DWORD flags = 0; + std::string certStoreLocation = options.certStoreLocation; + std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+ if (certStoreLocation == "currentuser") { + flags = CERT_SYSTEM_STORE_CURRENT_USER; + } else if (certStoreLocation == "localmachine") { + flags = CERT_SYSTEM_STORE_LOCAL_MACHINE; + } else if (certStoreLocation == "currentservice") { + flags = CERT_SYSTEM_STORE_CURRENT_SERVICE; + } else { + QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation + << " - Using default location"); + } HCERTSTORE certStoreHandle; certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A, X509_ASN_ENCODING, 0, - CERT_SYSTEM_STORE_LOCAL_MACHINE, + flags | + CERT_STORE_READONLY_FLAG, options.certStore.c_str()); if (!certStoreHandle) throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError()))); @@ -252,7 +281,7 @@ void SslProtocolFactory::established(sys::Poller::shared_ptr poller, boost::bind(&AsynchIOHandler::idle, async, _1)); } - async->init(aio, 4); + async->init(aio, brokerTimer, maxNegotiateTime, 4); aio->start(poller); } |
