summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/broker
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/AclModule.h6
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp155
-rw-r--r--cpp/src/qpid/broker/Bridge.h54
-rw-r--r--cpp/src/qpid/broker/Broker.cpp266
-rw-r--r--cpp/src/qpid/broker/Broker.h46
-rw-r--r--cpp/src/qpid/broker/ConfigurationObserver.h61
-rw-r--r--cpp/src/qpid/broker/ConfigurationObservers.h72
-rw-r--r--cpp/src/qpid/broker/Connection.cpp68
-rw-r--r--cpp/src/qpid/broker/Connection.h30
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp11
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp53
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h4
-rw-r--r--cpp/src/qpid/broker/ConnectionObservers.h28
-rw-r--r--cpp/src/qpid/broker/Consumer.h2
-rw-r--r--cpp/src/qpid/broker/Daemon.h1
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp3
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp4
-rw-r--r--cpp/src/qpid/broker/Exchange.h5
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp78
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp3
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp3
-rw-r--r--cpp/src/qpid/broker/Link.cpp336
-rw-r--r--cpp/src/qpid/broker/Link.h47
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp254
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h62
-rw-r--r--cpp/src/qpid/broker/Message.cpp12
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/MessageDeque.cpp45
-rw-r--r--cpp/src/qpid/broker/MessageDeque.h2
-rw-r--r--cpp/src/qpid/broker/MessageMap.cpp11
-rw-r--r--cpp/src/qpid/broker/MessageMap.h3
-rw-r--r--cpp/src/qpid/broker/Messages.h9
-rw-r--r--cpp/src/qpid/broker/NameGenerator.h1
-rw-r--r--cpp/src/qpid/broker/Observers.h69
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.cpp4
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.h1
-rw-r--r--cpp/src/qpid/broker/PrivateImplRef.h8
-rw-r--r--cpp/src/qpid/broker/Queue.cpp117
-rw-r--r--cpp/src/qpid/broker/Queue.h20
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp4
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp66
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h18
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp8
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp78
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.h2
-rw-r--r--cpp/src/qpid/broker/SecureConnectionFactory.cpp11
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp12
-rw-r--r--cpp/src/qpid/broker/SemanticState.h4
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp32
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h3
-rw-r--r--cpp/src/qpid/broker/SessionContext.h1
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp36
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h21
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.h5
-rw-r--r--cpp/src/qpid/broker/System.cpp10
-rw-r--r--cpp/src/qpid/broker/System.h17
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp369
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h126
-rw-r--r--cpp/src/qpid/broker/TopicKeyNode.h371
-rw-r--r--cpp/src/qpid/broker/windows/SaslAuthenticator.cpp4
-rw-r--r--cpp/src/qpid/broker/windows/SslProtocolFactory.cpp53
63 files changed, 2107 insertions, 1105 deletions
diff --git a/cpp/src/qpid/broker/AclModule.h b/cpp/src/qpid/broker/AclModule.h
index ff9281b6fc..7c180439cf 100644
--- a/cpp/src/qpid/broker/AclModule.h
+++ b/cpp/src/qpid/broker/AclModule.h
@@ -113,6 +113,7 @@ namespace acl {
namespace broker {
+ class Connection;
class AclModule
{
@@ -139,6 +140,11 @@ namespace broker {
// Add specialized authorise() methods as required.
+ /** Approve connection by counting connections total, per-IP, and
+ * per-user.
+ */
+ virtual bool approveConnection (const Connection& connection)=0;
+
virtual ~AclModule() {};
};
} // namespace broker
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 5b531e4636..d1706b5907 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -57,22 +57,25 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
conn->received(frame);
}
-Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args,
- InitializeCallback init) :
- link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
- initialize(init), detached(false)
+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
+ CancellationListener l, const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init, const std::string& _queueName, const string& ae) :
+ link(_link), channel(_id), args(_args), mgmtObject(0),
+ listener(l), name(_name),
+ queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
+ : _queueName),
+ altEx(ae), persistenceId(0),
+ connState(0), conn(0), initialize(init), detached(false),
+ useExistingQueue(!_queueName.empty()),
+ sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
{
- std::stringstream title;
- title << id << "_" << name;
- queueName += title.str();
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
- (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
+ (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
+ mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
@@ -90,23 +93,22 @@ void Bridge::create(Connection& c)
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
- SessionHandler& sessionHandler = c.getChannel(id);
- sessionHandler.setDetachedCallback(
- boost::bind(&Bridge::sessionDetached, shared_from_this()));
+ SessionHandler& sessionHandler = c.getChannel(channel);
+ sessionHandler.setErrorListener(shared_from_this());
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
pushHandler.reset(new PushHandler(&c));
- channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+ channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
- session->attach(name, false);
+ session->attach(sessionName, false);
session->commandPoint(0,0);
} else {
- sessionHandler.attachAs(name);
+ sessionHandler.attachAs(sessionName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
@@ -115,7 +117,7 @@ void Bridge::create(Connection& c)
if (initialize) initialize(*this, sessionHandler);
else if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
@@ -138,12 +140,13 @@ void Bridge::create(Connection& c)
}
bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
- bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
+ bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
if (!args.i_dynamic)
peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
if (args.i_dynamic) {
@@ -163,11 +166,12 @@ void Bridge::cancel(Connection&)
{
if (resetProxy()) {
peer->getMessage().cancel(args.i_dest);
- peer->getSession().detach(name);
+ peer->getSession().detach(sessionName);
}
QPID_LOG(debug, "Cancelled bridge " << name);
}
+/** Notify the bridge that the connection has closed */
void Bridge::closed()
{
if (args.i_dynamic) {
@@ -177,9 +181,10 @@ void Bridge::closed()
QPID_LOG(debug, "Closed bridge " << name);
}
-void Bridge::destroy()
+/** Shut down the bridge */
+void Bridge::close()
{
- listener(this);
+ listener(this); // ask the LinkRegistry to destroy us
}
void Bridge::setPersistenceId(uint64_t pId) const
@@ -187,8 +192,21 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
+
+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
+
+bool Bridge::isEncodedBridge(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
+}
+
+
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string src;
@@ -196,9 +214,33 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
string key;
string id;
string excludes;
+ string name;
+
+ Link::shared_ptr link;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the bridge by host:port, not by name, and
+ * transport wasn't provided. Try to find a link using those paramters.
+ */
+ buffer.getShortString(host);
+ port = buffer.getShort();
+
+ link = links.getLink(host, port);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
+ return Bridge::shared_ptr();
+ }
+ } else {
+ string linkName;
+
+ buffer.getShortString(name);
+ buffer.getShortString(linkName);
+ link = links.getLink(linkName);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
+ return Bridge::shared_ptr();
+ }
+ }
- buffer.getShortString(host);
- port = buffer.getShort();
bool durable(buffer.getOctet());
buffer.getShortString(src);
buffer.getShortString(dest);
@@ -210,15 +252,21 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
- return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic, sync).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions did not provide a name for the bridge, so create one
+ */
+ name = createName(link->getName(), src, dest, key);
+ }
+
+ return links.declare(name, *link, durable, src, dest, key, is_queue,
+ is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
{
- buffer.putShortString(string("bridge"));
- buffer.putShortString(link->getHost());
- buffer.putShort(link->getPort());
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
+ buffer.putShortString(link->getName());
buffer.putOctet(args.i_durable ? 1 : 0);
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
@@ -233,9 +281,9 @@ void Bridge::encode(Buffer& buffer) const
uint32_t Bridge::encodedSize() const
{
- return link->getHost().size() + 1 // short-string (host)
- + 7 // short-string ("bridge")
- + 2 // port
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + link->getName().size() + 1
+ 1 // durable
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
@@ -259,7 +307,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
{
if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
- destroy();
+ QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
+ close();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
@@ -306,7 +355,7 @@ void Bridge::sendReorigin()
}
bool Bridge::resetProxy()
{
- SessionHandler& sessionHandler = conn->getChannel(id);
+ SessionHandler& sessionHandler = conn->getChannel(channel);
if (!sessionHandler.getSession()) peer.reset();
else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
return peer.get();
@@ -318,7 +367,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang
peer->getExchange().bind(queue, exchange, key, args);
} else {
QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge");
- destroy();
+ close();
}
}
@@ -333,8 +382,38 @@ const string& Bridge::getLocalTag() const
return link->getBroker()->getFederationTag();
}
-void Bridge::sessionDetached() {
+// SessionHandler::ErrorListener methods.
+void Bridge::connectionException(
+ framing::connection::CloseCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->connectionException(code, msg);
+}
+
+void Bridge::channelException(
+ framing::session::DetachCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->channelException(code, msg);
+}
+
+void Bridge::executionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->executionException(code, msg);
+}
+
+void Bridge::detach() {
detached = true;
+ if (errorListener) errorListener->detach();
+}
+
+std::string Bridge::createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ std::stringstream keystream;
+ keystream << linkName << "!" << src << "!" << dest << "!" << key;
+ return keystream.str();
}
}}
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index 32b9fd1781..ee298afd45 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -29,6 +29,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/SessionHandler.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
#include "qmf/org/apache/qpid/broker/Bridge.h"
@@ -43,29 +44,31 @@ class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
-class SessionHandler;
class Bridge : public PersistableConfig,
public management::Manageable,
public Exchange::DynamicBridge,
+ public SessionHandler::ErrorListener,
public boost::enable_shared_from_this<Bridge>
{
-public:
+ public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
- Bridge(Link* link, framing::ChannelId id, CancellationListener l,
+ Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l,
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
- InitializeCallback init
+ InitializeCallback init, const std::string& queueName="",
+ const std::string& altExchange=""
);
~Bridge();
- void create(Connection& c);
- void cancel(Connection& c);
- void closed();
- void destroy();
+ QPID_BROKER_EXTERN void close();
bool isDurable() { return args.i_durable; }
+ Link *getLink() const { return link; }
+ const std::string getSrc() const { return args.i_src; }
+ const std::string getDest() const { return args.i_dest; }
+ const std::string getKey() const { return args.i_key; }
bool isDetached() const { return detached; }
@@ -80,7 +83,11 @@ public:
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
const std::string& getName() const { return name; }
+
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedBridge(const std::string& key);
// Exchange::DynamicBridge methods
void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0);
@@ -93,10 +100,20 @@ public:
std::string getQueueName() const { return queueName; }
const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
-private:
- // Callback when the bridge's session is detached.
- void sessionDetached();
+ /** create a name for a bridge (if none supplied by user config) */
+ static std::string createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
+
+ // SessionHandler::ErrorListener methods.
+ void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ void channelException(framing::session::DetachCode, const std::string& msg);
+ void executionException(framing::execution::ErrorCode, const std::string& msg);
+ void detach();
+ void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
+ private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
@@ -108,19 +125,30 @@ private:
std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
std::auto_ptr<framing::AMQP_ServerProxy> peer;
- Link* link;
- framing::ChannelId id;
+ Link* const link;
+ const framing::ChannelId channel;
qmf::org::apache::qpid::broker::ArgsLinkBridge args;
qmf::org::apache::qpid::broker::Bridge* mgmtObject;
CancellationListener listener;
std::string name;
std::string queueName;
+ std::string altEx;
mutable uint64_t persistenceId;
ConnectionState* connState;
Connection* conn;
InitializeCallback initialize;
bool detached; // Set when session is detached.
bool resetProxy();
+
+ // connection Management (called by owning Link)
+ void create(Connection& c);
+ void cancel(Connection& c);
+ void closed();
+ friend class Link; // to call create, cancel, closed()
+ boost::shared_ptr<ErrorListener> errorListener;
+
+ const bool useExistingQueue;
+ const std::string sessionName;
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index f20cce18a2..b763dd4119 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -108,7 +108,6 @@ Broker::Options::Options(const std::string& name) :
noDataDir(0),
port(DEFAULT_PORT),
workerThreads(5),
- maxConnections(500),
connectionBacklog(10),
enableMgmt(1),
mgmtPublish(1),
@@ -128,8 +127,10 @@ Broker::Options::Options(const std::string& name) :
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
defaultMsgGroup("qpid.no-group"),
- timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
- linkMaintenanceInterval(2)
+ timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
+ linkMaintenanceInterval(2),
+ linkHeartbeatInterval(120),
+ maxNegotiateTime(2000) // 2s
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -146,7 +147,6 @@ Broker::Options::Options(const std::string& name) :
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
- ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
@@ -171,6 +171,9 @@ Broker::Options::Options(const std::string& name) :
("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
+ ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"))
+ ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation")
+ ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
;
}
@@ -208,7 +211,6 @@ Broker::Broker(const Broker::Options& conf) :
inCluster(false),
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
- connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
{
@@ -227,7 +229,6 @@ Broker::Broker(const Broker::Options& conf) :
mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
- mgmtObject->set_maxConns(conf.maxConnections);
mgmtObject->set_connBacklog(conf.connectionBacklog);
mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
mgmtObject->set_mgmtPublish(conf.mgmtPublish);
@@ -244,8 +245,11 @@ Broker::Broker(const Broker::Options& conf) :
// management schema correct.
Vhost* vhost = new Vhost(this, this);
vhostObject = Vhost::shared_ptr(vhost);
- framing::Uuid uuid(managementAgent->getUuid());
- federationTag = uuid.str();
+ if (conf.fedTag.empty()) {
+ framing::Uuid uuid(managementAgent->getUuid());
+ federationTag = uuid.str();
+ } else
+ federationTag = conf.fedTag;
vhostObject->setFederationTag(federationTag);
queues.setParent(vhost);
@@ -254,8 +258,11 @@ Broker::Broker(const Broker::Options& conf) :
} else {
// Management is disabled so there is no broker management ID.
// Create a unique uuid to use as the federation tag.
- framing::Uuid uuid(true);
- federationTag = uuid.str();
+ if (conf.fedTag.empty()) {
+ framing::Uuid uuid(true);
+ federationTag = uuid.str();
+ } else
+ federationTag = conf.fedTag;
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
@@ -346,7 +353,7 @@ Broker::Broker(const Broker::Options& conf) :
knownBrokers.push_back(Url(conf.knownHosts));
}
- } catch (const std::exception& /*e*/) {
+ } catch (const std::exception&) {
finalize();
throw;
}
@@ -443,7 +450,7 @@ Manageable* Broker::GetVhostObject(void) const
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
Args& args,
- string&)
+ string& text)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -458,6 +465,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CONNECT : {
+ /** Management is creating a Link to a remote broker using the host and port of
+ * the remote. This (old) interface does not allow management to specify a name
+ * for the link, nor does it allow multiple Links to the same remote. Use the
+ * "create()" broker method if these features are needed.
+ * TBD: deprecate this interface.
+ */
+ QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='link' instead.");
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
@@ -466,13 +481,24 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
"; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
+ text = "transport type not supported";
return Manageable::STATUS_NOT_IMPLEMENTED;
}
- std::pair<Link::shared_ptr, bool> response =
- links.declare (hp.i_host, hp.i_port, transport, hp.i_durable,
- hp.i_authMechanism, hp.i_username, hp.i_password);
- if (hp.i_durable && response.second)
- store->create(*response.first);
+
+ // Does a link to the remote already exist? If so, re-use the existing link
+ // - this behavior is backward compatible with previous releases.
+ if (!links.getLink(hp.i_host, hp.i_port, transport)) {
+ // new link, need to generate a unique name for it
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare(Link::createName(transport, hp.i_host, hp.i_port),
+ hp.i_host, hp.i_port, transport,
+ hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
+ if (!response.first) {
+ text = "Unable to create Link";
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ break;
+ }
+ }
status = Manageable::STATUS_OK;
break;
}
@@ -543,6 +569,8 @@ const std::string TYPE_QUEUE("queue");
const std::string TYPE_EXCHANGE("exchange");
const std::string TYPE_TOPIC("topic");
const std::string TYPE_BINDING("binding");
+const std::string TYPE_LINK("link");
+const std::string TYPE_BRIDGE("bridge");
const std::string DURABLE("durable");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
@@ -554,6 +582,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
const std::string _TRUE("true");
const std::string _FALSE("false");
+
+// parameters for creating a Link object, see mgmt schema
+const std::string HOST("host");
+const std::string PORT("port");
+const std::string TRANSPORT("transport");
+const std::string AUTH_MECHANISM("authMechanism");
+const std::string USERNAME("username");
+const std::string PASSWORD("password");
+
+// parameters for creating a Bridge object, see mgmt schema
+const std::string LINK("link");
+const std::string SRC("src");
+const std::string DEST("dest");
+const std::string KEY("key");
+const std::string TAG("tag");
+const std::string EXCLUDES("excludes");
+const std::string SRC_IS_QUEUE("srcIsQueue");
+const std::string SRC_IS_LOCAL("srcIsLocal");
+const std::string DYNAMIC("dynamic");
+const std::string SYNC("sync");
}
struct InvalidBindingIdentifier : public qpid::Exception
@@ -603,6 +651,25 @@ struct UnknownObjectType : public qpid::Exception
std::string getPrefix() const { return "unknown object type"; }
};
+struct ReservedObjectName : public qpid::Exception
+{
+ ReservedObjectName(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return std::string("names prefixed with '")
+ + QPID_NAME_PREFIX + std::string("' are reserved"); }
+};
+
+struct UnsupportedTransport : public qpid::Exception
+{
+ UnsupportedTransport(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "transport is not supported"; }
+};
+
+struct InvalidParameter : public qpid::Exception
+{
+ InvalidParameter(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "invalid parameter to method call"; }
+};
+
void Broker::createObject(const std::string& type, const std::string& name,
const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
{
@@ -674,6 +741,113 @@ void Broker::createObject(const std::string& type, const std::string& name,
amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+
+ } else if (type == TYPE_LINK) {
+
+ QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string host;
+ uint16_t port = 0;
+ std::string transport = TCP_TRANSPORT;
+ bool durable = false;
+ std::string authMech, username, password;
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if (i->first == HOST) host = i->second.asString();
+ else if (i->first == PORT) port = i->second.asUint16();
+ else if (i->first == TRANSPORT) transport = i->second.asString();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
+ else if (i->first == USERNAME) username = i->second.asString();
+ else if (i->first == PASSWORD) password = i->second.asString();
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ if (!getProtocolFactory(transport)) {
+ QPID_LOG(error, "Transport '" << transport << "' not supported.");
+ throw UnsupportedTransport(transport);
+ }
+
+ std::pair<boost::shared_ptr<Link>, bool> rc;
+ rc = links.declare(name, host, port, transport, durable, authMech, username, password);
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
+ "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
+
+ } else if (type == TYPE_BRIDGE) {
+
+ QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string linkName;
+ std::string src;
+ std::string dest;
+ std::string key;
+ std::string id;
+ std::string excludes;
+ std::string queueName;
+ bool durable = false;
+ bool srcIsQueue = false;
+ bool srcIsLocal = false;
+ bool dynamic = false;
+ uint16_t sync = 0;
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+
+ if (i->first == LINK) linkName = i->second.asString();
+ else if (i->first == SRC) src = i->second.asString();
+ else if (i->first == DEST) dest = i->second.asString();
+ else if (i->first == KEY) key = i->second.asString();
+ else if (i->first == TAG) id = i->second.asString();
+ else if (i->first == EXCLUDES) excludes = i->second.asString();
+ else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
+ else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
+ else if (i->first == DYNAMIC) dynamic = bool(i->second);
+ else if (i->first == SYNC) sync = i->second.asUint16();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else if (i->first == QUEUE_NAME) queueName = i->second.asString();
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ boost::shared_ptr<Link> link;
+ if (linkName.empty() || !(link = links.getLink(linkName))) {
+ QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
+ throw InvalidParameter(name);
+ }
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
+ dynamic, sync,
+ 0,
+ queueName);
+
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
+ "; src=" << src << "; dest=" << dest << "; key=" << key);
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
} else {
throw UnknownObjectType(type);
}
@@ -696,6 +870,16 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+ } else if (type == TYPE_LINK) {
+ boost::shared_ptr<Link> link = links.getLink(name);
+ if (link) {
+ link->close();
+ }
+ } else if (type == TYPE_BRIDGE) {
+ boost::shared_ptr<Bridge> bridge = links.getBridge(name);
+ if (bridge) {
+ bridge->close();
+ }
} else {
throw UnknownObjectType(type);
}
@@ -920,6 +1104,13 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
ManagementAgent::toMap(arguments),
"created"));
}
+ QPID_LOG_CAT(debug, model, "Create queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ << " durable:" << (durable ? "T" : "F")
+ << " owner:" << owner
+ << " autodelete:" << (autodelete ? "T" : "F")
+ << " alternateExchange:" << alternateExchange );
}
return result;
}
@@ -942,6 +1133,10 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId,
if (managementAgent.get())
managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+ QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ );
}
@@ -993,6 +1188,12 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
ManagementAgent::toMap(arguments),
"created"));
}
+ QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ << " type:" << type
+ << " alternateExchange:" << alternateExchange
+ << " durable:" << (durable ? "T" : "F"));
}
return result;
}
@@ -1017,7 +1218,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
if (managementAgent.get())
managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
-
+ QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId);
}
void Broker::bind(const std::string& queueName,
@@ -1047,10 +1250,16 @@ void Broker::bind(const std::string& queueName,
throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
} else {
if (queue->bind(exchange, key, arguments)) {
+ getConfigurationObservers().bind(exchange, queue, key, arguments);
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
queueName, key, ManagementAgent::toMap(arguments)));
}
+ QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
+ << " queue:" << queueName
+ << " key:" << key
+ << " user:" << userId
+ << " rhost:" << connectionId);
}
}
}
@@ -1082,12 +1291,33 @@ void Broker::unbind(const std::string& queueName,
if (exchange->isDurable() && queue->isDurable()) {
store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
}
+ getConfigurationObservers().unbind(
+ exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
}
+ QPID_LOG_CAT(debug, model, "Delete binding. exchange:" << exchangeName
+ << " queue:" << queueName
+ << " key:" << key
+ << " user:" << userId
+ << " rhost:" << connectionId);
}
}
}
+// FIXME aconway 2012-04-27: access to linkClientProperties is
+// not properly thread safe, you could lose fields if 2 threads
+// attempt to add a field concurrently.
+
+framing::FieldTable Broker::getLinkClientProperties() const {
+ sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+ return linkClientProperties;
+}
+
+void Broker::setLinkClientProperties(const framing::FieldTable& ft) {
+ sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+ linkClientProperties = ft;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 135b9340f9..922d0558e5 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -40,6 +40,7 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/ConfigurationObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -64,8 +65,8 @@
namespace qpid {
namespace sys {
- class ProtocolFactory;
- class Poller;
+class ProtocolFactory;
+class Poller;
}
struct Url;
@@ -91,7 +92,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
public management::Manageable,
public RefCounted
{
-public:
+ public:
struct Options : public qpid::Options {
static const std::string DEFAULT_DATA_DIR_LOCATION;
@@ -103,7 +104,6 @@ public:
std::string dataDir;
uint16_t port;
int workerThreads;
- int maxConnections;
int connectionBacklog;
bool enableMgmt;
bool mgmtPublish;
@@ -127,31 +127,14 @@ public:
std::string defaultMsgGroup;
bool timestampRcvMsgs;
double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values.
+ uint16_t linkHeartbeatInterval;
+ uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation
+ std::string fedTag;
private:
std::string getHome();
};
- class ConnectionCounter {
- int maxConnections;
- int connectionCount;
- sys::Mutex connectionCountLock;
- public:
- ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
- void inc_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount++;
- }
- void dec_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount--;
- }
- bool allowConnection() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- return (maxConnections <= connectionCount);
- }
- };
-
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
@@ -183,6 +166,7 @@ public:
AclModule* acl;
DataDir dataDir;
ConnectionObservers connectionObservers;
+ ConfigurationObservers configurationObservers;
QueueRegistry queues;
ExchangeRegistry exchanges;
@@ -203,9 +187,11 @@ public:
bool recovery;
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- ConnectionCounter connectionCounter;
ConsumerFactories consumerFactories;
+ mutable sys::Mutex linkClientPropertiesLock;
+ framing::FieldTable linkClientProperties;
+
public:
QPID_BROKER_EXTERN virtual ~Broker();
@@ -317,8 +303,6 @@ public:
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
- ConnectionCounter& getConnectionCounter() {return connectionCounter;}
-
/**
* Never true in a stand-alone broker. In a cluster, return true
* to defer delivery of messages deliveredg in a cluster-unsafe
@@ -377,6 +361,14 @@ public:
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+ ConfigurationObservers& getConfigurationObservers() { return configurationObservers; }
+
+ /** Properties to be set on outgoing link connections */
+ QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
+ QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
+
+ /** Information identifying this system */
+ boost::shared_ptr<const System> getSystem() const { return systemObject; }
};
}}
diff --git a/cpp/src/qpid/broker/ConfigurationObserver.h b/cpp/src/qpid/broker/ConfigurationObserver.h
new file mode 100644
index 0000000000..701043db40
--- /dev/null
+++ b/cpp/src/qpid/broker/ConfigurationObserver.h
@@ -0,0 +1,61 @@
+#ifndef QPID_BROKER_CONFIGURATIONOBSERVER_H
+#define QPID_BROKER_CONFIGURATIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+class Queue;
+class Exchange;
+
+
+/**
+ * Observer for changes to configuration (aka wiring)
+ */
+class ConfigurationObserver
+{
+ public:
+ virtual ~ConfigurationObserver() {}
+ virtual void queueCreate(const boost::shared_ptr<Queue>&) {}
+ virtual void queueDestroy(const boost::shared_ptr<Queue>&) {}
+ virtual void exchangeCreate(const boost::shared_ptr<Exchange>&) {}
+ virtual void exchangeDestroy(const boost::shared_ptr<Exchange>&) {}
+ virtual void bind(const boost::shared_ptr<Exchange>& ,
+ const boost::shared_ptr<Queue>& ,
+ const std::string& /*key*/,
+ const framing::FieldTable& /*args*/) {}
+ virtual void unbind(const boost::shared_ptr<Exchange>&,
+ const boost::shared_ptr<Queue>& ,
+ const std::string& /*key*/,
+ const framing::FieldTable& /*args*/) {}
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONFIGURATIONOBSERVER_H*/
diff --git a/cpp/src/qpid/broker/ConfigurationObservers.h b/cpp/src/qpid/broker/ConfigurationObservers.h
new file mode 100644
index 0000000000..4c1159747d
--- /dev/null
+++ b/cpp/src/qpid/broker/ConfigurationObservers.h
@@ -0,0 +1,72 @@
+#ifndef QPID_BROKER_CONFIGURATIONOBSERVERS_H
+#define QPID_BROKER_CONFIGURATIONOBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConfigurationObserver.h"
+#include "Observers.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A configuration observer that delegates to a collection of
+ * configuration observers.
+ *
+ * THREAD SAFE
+ */
+class ConfigurationObservers : public ConfigurationObserver,
+ public Observers<ConfigurationObserver>
+{
+ public:
+ void queueCreate(const boost::shared_ptr<Queue>& q) {
+ each(boost::bind(&ConfigurationObserver::queueCreate, _1, q));
+ }
+ void queueDestroy(const boost::shared_ptr<Queue>& q) {
+ each(boost::bind(&ConfigurationObserver::queueDestroy, _1, q));
+ }
+ void exchangeCreate(const boost::shared_ptr<Exchange>& e) {
+ each(boost::bind(&ConfigurationObserver::exchangeCreate, _1, e));
+ }
+ void exchangeDestroy(const boost::shared_ptr<Exchange>& e) {
+ each(boost::bind(&ConfigurationObserver::exchangeDestroy, _1, e));
+ }
+ void bind(const boost::shared_ptr<Exchange>& exchange,
+ const boost::shared_ptr<Queue>& queue,
+ const std::string& key,
+ const framing::FieldTable& args) {
+ each(boost::bind(
+ &ConfigurationObserver::bind, _1, exchange, queue, key, args));
+ }
+ void unbind(const boost::shared_ptr<Exchange>& exchange,
+ const boost::shared_ptr<Queue>& queue,
+ const std::string& key,
+ const framing::FieldTable& args) {
+ each(boost::bind(
+ &ConfigurationObserver::unbind, _1, exchange, queue, key, args));
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONFIGURATIONOBSERVERS_H*/
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 5e339cec03..8d250a32e5 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -43,7 +43,7 @@
#include <iostream>
#include <assert.h>
-
+using std::string;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -87,10 +87,14 @@ Connection::Connection(ConnectionOutputHandler* out_,
bool link_,
uint64_t objectId_,
bool shadow_,
- bool delayManagement) :
+ bool delayManagement,
+ bool authenticated_
+) :
ConnectionState(out_, broker_),
securitySettings(external),
- adapter(*this, link_, shadow_),
+ shadow(shadow_),
+ authenticated(authenticated_),
+ adapter(*this, link_),
link(link_),
mgmtClosing(false),
mgmtId(mgmtId_),
@@ -100,14 +104,12 @@ Connection::Connection(ConnectionOutputHandler* out_,
timer(broker_.getTimer()),
errorListener(0),
objectId(objectId_),
- shadow(shadow_),
outboundTracker(*this)
{
outboundTracker.wrap(out);
broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
- if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::addManagementObject() {
@@ -141,6 +143,8 @@ Connection::~Connection()
// a cluster-unsafe context. Don't raise an event in that case.
if (!link && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
+ QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
+ << " rhost:" << mgmtId );
}
broker.getConnectionObservers().closed(*this);
@@ -148,8 +152,9 @@ Connection::~Connection()
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
-
- if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
}
void Connection::received(framing::AMQFrame& frame) {
@@ -284,6 +289,10 @@ void Connection::raiseConnectEvent() {
mgmtObject->set_authIdentity(userId);
agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
}
+
+ QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
+ << " rhost:" << mgmtId );
+
}
void Connection::setUserProxyAuth(bool b)
@@ -300,6 +309,9 @@ void Connection::close(connection::CloseCode code, const string& text)
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -313,6 +325,9 @@ void Connection::sendClose() {
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -326,6 +341,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
@@ -435,6 +453,31 @@ struct ConnectionHeartbeatTask : public sys::TimerTask {
}
};
+class LinkHeartbeatTask : public qpid::sys::TimerTask {
+ sys::Timer& timer;
+ Connection& connection;
+ bool heartbeatSeen;
+
+ void fire() {
+ if (!heartbeatSeen) {
+ QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection");
+ connection.abort();
+ } else {
+ heartbeatSeen = false;
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+ }
+ }
+
+public:
+ LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) :
+ TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {}
+
+ void heartbeatReceived() { heartbeatSeen = true; }
+};
+
+
void Connection::abort()
{
// Make sure that we don't try to send a heartbeat as we're
@@ -460,10 +503,21 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat)
}
}
+void Connection::startLinkHeartbeatTimeoutTask() {
+ if (!linkHeartbeatTimer && heartbeat > 0) {
+ linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
+ timer.add(linkHeartbeatTimer);
+ }
+}
+
void Connection::restartTimeout()
{
if (timeoutTimer)
timeoutTimer->touch();
+
+ if (linkHeartbeatTimer) {
+ static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived();
+ }
}
bool Connection::isOpen() { return adapter.isOpen(); }
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 1b8bd83139..d01599ce54 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -27,8 +27,7 @@
#include <vector>
#include <queue>
-#include <boost/ptr_container/ptr_map.hpp>
-
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionHandler.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/SessionHandler.h"
@@ -86,15 +85,22 @@ class Connection : public sys::ConnectionInputHandler,
bool isLink = false,
uint64_t objectId = 0,
bool shadow=false,
- bool delayManagement = false);
+ bool delayManagement = false,
+ bool authenticated=true);
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
SessionHandler& getChannel(framing::ChannelId channel);
- /** Close the connection */
- void close(framing::connection::CloseCode code, const std::string& text);
+ /** Close the connection. Waits for the client to respond with close-ok
+ * before actually destroying the connection.
+ */
+ QPID_BROKER_EXTERN void close(
+ framing::connection::CloseCode code, const std::string& text);
+
+ /** Abort the connection. Close abruptly and immediately. */
+ QPID_BROKER_EXTERN void abort();
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
@@ -138,8 +144,7 @@ class Connection : public sys::ConnectionInputHandler,
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
void restartTimeout();
- void abort();
-
+
template <class F> void eachSessionHandler(F f) {
for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
f(*ptr_map_ptr(i));
@@ -149,7 +154,10 @@ class Connection : public sys::ConnectionInputHandler,
void setSecureConnection(SecureConnection* secured);
/** True if this is a shadow connection in a cluster. */
- bool isShadow() { return shadow; }
+ bool isShadow() const { return shadow; }
+
+ /** True if this connection is authenticated */
+ bool isAuthenticated() const { return authenticated; }
// Used by cluster to update connection status
sys::AggregateOutput& getOutputTasks() { return outputTasks; }
@@ -166,6 +174,7 @@ class Connection : public sys::ConnectionInputHandler,
bool isOpen();
bool isLink() { return link; }
+ void startLinkHeartbeatTimeoutTask();
// Used by cluster during catch-up, see cluster::OutputInterceptor
void doIoCallbacks();
@@ -179,6 +188,8 @@ class Connection : public sys::ConnectionInputHandler,
ChannelMap channels;
qpid::sys::SecuritySettings securitySettings;
+ bool shadow;
+ bool authenticated;
ConnectionHandler adapter;
const bool link;
bool mgmtClosing;
@@ -189,11 +200,10 @@ class Connection : public sys::ConnectionInputHandler,
LinkRegistry& links;
management::ManagementAgent* agent;
sys::Timer& timer;
- boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
+ boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
uint64_t objectId;
- bool shadow;
framing::FieldTable clientProperties;
/**
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index 9e0020812b..d5d24ca629 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,11 +40,6 @@ ConnectionFactory::~ConnectionFactory() {}
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10)) {
ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false)));
@@ -62,5 +57,5 @@ ConnectionFactory::create(sys::OutputControl& out, const std::string& id,
return c.release();
}
-
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 6894324117..06f442a47f 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -36,6 +36,9 @@
using namespace qpid;
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using qpid::sys::SecurityLayer;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -103,9 +106,10 @@ void ConnectionHandler::setSecureConnection(SecureConnection* secured)
handler->secured = secured;
}
-ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool isShadow) : handler(new Handler(connection, isClient, isShadow)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) :
+ handler(new Handler(connection, isClient)) {}
-ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
+ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
proxy(c.getOutput()),
connection(c), serverMode(!isClient), secured(0),
isOpen(false)
@@ -116,14 +120,13 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow)
properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
- authenticator = SaslAuthenticator::createAuthenticator(c, isShadow);
+ authenticator = SaslAuthenticator::createAuthenticator(c);
authenticator->getMechanisms(mechanisms);
Array locales(0x95);
boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
locales.add(l);
proxy.start(properties, mechanisms, locales);
-
}
maxFrameSize = (64 * 1024) - 1;
@@ -149,12 +152,20 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
} catch (std::exception& /*e*/) {
management::ManagementAgent* agent = connection.getAgent();
- if (agent) {
+ bool logEnabled;
+ QPID_LOG_TEST_CAT(debug, model, logEnabled);
+ if (logEnabled || agent)
+ {
string error;
string uid;
authenticator->getError(error);
authenticator->getUid(uid);
- agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ if (agent) {
+ agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ }
+ QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
+ << " user:" << uid
+ << " reason:" << error );
}
throw;
}
@@ -169,7 +180,9 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
AclModule* acl = connection.getBroker().getAcl();
FieldTable properties;
if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
- proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
+ proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
+ QPID_MSG("ACL denied " << connection.getUserId()
+ << " creating a federation link"));
return;
}
QPID_LOG(info, "Connection is a federation link");
@@ -195,12 +208,20 @@ void ConnectionHandler::Handler::secureOk(const string& response)
authenticator->step(response);
} catch (std::exception& /*e*/) {
management::ManagementAgent* agent = connection.getAgent();
- if (agent) {
+ bool logEnabled;
+ QPID_LOG_TEST_CAT(debug, model, logEnabled);
+ if (logEnabled || agent)
+ {
string error;
string uid;
authenticator->getError(error);
authenticator->getUid(uid);
- agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ if (agent) {
+ agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ }
+ QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
+ << " user:" << uid
+ << " reason:" << error );
}
throw;
}
@@ -278,7 +299,7 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
service,
host,
0, // TODO -- mgoulish Fri Sep 24 2010
- 256,
+ 256,
false ); // disallow interaction
}
std::string supportedMechanismsList;
@@ -318,7 +339,7 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
}
- FieldTable ft;
+ FieldTable ft = connection.getBroker().getLinkClientProperties();
ft.setInt(QPID_FED_LINK,1);
ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
@@ -367,8 +388,14 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax,
maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
connection.setFrameMax(maxFrameSize);
- connection.setHeartbeat(heartbeatMax);
- proxy.tuneOk(channelMax, maxFrameSize, heartbeatMax);
+ // this method is only ever called when this Connection
+ // is a federation link where this Broker is acting as
+ // a client to another Broker
+ uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax);
+ connection.setHeartbeat(hb);
+ connection.startLinkHeartbeatTimeoutTask();
+
+ proxy.tuneOk(channelMax, maxFrameSize, hb);
proxy.open("/", Array(), true);
}
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index 2e25543308..9346e7b1ac 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -61,7 +61,7 @@ class ConnectionHandler : public framing::FrameHandler
SecureConnection* secured;
bool isOpen;
- Handler(Connection& connection, bool isClient, bool isShadow=false);
+ Handler(Connection& connection, bool isClient);
~Handler();
void startOk(const qpid::framing::ConnectionStartOkBody& body);
void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -99,7 +99,7 @@ class ConnectionHandler : public framing::FrameHandler
bool handle(const qpid::framing::AMQMethodBody& method);
public:
- ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false );
+ ConnectionHandler(Connection& connection, bool isClient );
void close(framing::connection::CloseCode code, const std::string& text);
void heartbeat();
void handle(framing::AMQFrame& frame);
diff --git a/cpp/src/qpid/broker/ConnectionObservers.h b/cpp/src/qpid/broker/ConnectionObservers.h
index 07e515f3c9..e9014c80c3 100644
--- a/cpp/src/qpid/broker/ConnectionObservers.h
+++ b/cpp/src/qpid/broker/ConnectionObservers.h
@@ -23,9 +23,7 @@
*/
#include "ConnectionObserver.h"
-#include "qpid/sys/Mutex.h"
-#include <set>
-#include <algorithm>
+#include "Observers.h"
namespace qpid {
namespace broker {
@@ -35,18 +33,10 @@ namespace broker {
* Calling a ConnectionObserver function will call that function on each observer.
* THREAD SAFE.
*/
-class ConnectionObservers : public ConnectionObserver {
+class ConnectionObservers : public ConnectionObserver,
+ public Observers<ConnectionObserver>
+{
public:
- void add(boost::shared_ptr<ConnectionObserver> observer) {
- sys::Mutex::ScopedLock l(lock);
- observers.insert(observer);
- }
-
- void remove(boost::shared_ptr<ConnectionObserver> observer) {
- sys::Mutex::ScopedLock l(lock);
- observers.erase(observer);
- }
-
void connection(Connection& c) {
each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
}
@@ -62,16 +52,6 @@ class ConnectionObservers : public ConnectionObserver {
void forced(Connection& c, const std::string& text) {
each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text));
}
-
- private:
- typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers;
- sys::Mutex lock;
- Observers observers;
-
- template <class F> void each(F f) {
- sys::Mutex::ScopedLock l(lock);
- std::for_each(observers.begin(), observers.end(), f);
- }
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 682c75ed4f..64073621be 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -54,7 +54,9 @@ class Consumer
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
+ /**@return the position of the last message seen by this consumer */
virtual framing::SequenceNumber getPosition() const { return position; }
+
virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
virtual bool deliver(QueuedMessage& msg) = 0;
diff --git a/cpp/src/qpid/broker/Daemon.h b/cpp/src/qpid/broker/Daemon.h
index a9cd98bce2..2bb9fc5577 100644
--- a/cpp/src/qpid/broker/Daemon.h
+++ b/cpp/src/qpid/broker/Daemon.h
@@ -74,7 +74,6 @@ class Daemon : private boost::noncopyable
pid_t pid;
int pipeFds[2];
- int lockFileFd;
std::string lockFile;
std::string pidDir;
};
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 5d9aea7509..2fa7ce0fc5 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -26,6 +26,9 @@
#include <iostream>
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::management::Manageable;
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 8d20b0df81..82d4b4df15 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -35,6 +35,8 @@
namespace qpid {
namespace broker {
+using std::string;
+
using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
@@ -167,7 +169,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
- name(_name), durable(false), persistenceId(0), sequence(false),
+ name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false),
sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 7376f814ed..fba752210f 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -174,8 +174,9 @@ public:
bool isDurable() { return durable; }
qpid::framing::FieldTable& getArgs() { return args; }
- Exchange::shared_ptr getAlternate() { return alternate; }
- void setAlternate(Exchange::shared_ptr _alternate);
+ QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
+ QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
+
void incAlternateUsers() { alternateUsers++; }
void decAlternateUsers() { alternateUsers--; }
bool inUseAsAlternate() { return alternateUsers > 0; }
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 43d7268dfb..b31c7bd7b8 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -19,6 +19,7 @@
*
*/
+#include "qpid/broker/Broker.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
@@ -42,38 +43,42 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
bool durable, const FieldTable& args){
- RWlock::ScopedWlock locker(lock);
- ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end()) {
- Exchange::shared_ptr exchange;
-
- if (type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
- }else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
- }else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
- }else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
- }else if (type == ManagementDirectExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
- }else if (type == ManagementTopicExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
- }else if (type == Link::exchangeTypeName) {
- exchange = Link::linkExchangeFactory(name);
- }else{
- FunctionMap::iterator i = factory.find(type);
- if (i == factory.end()) {
- throw UnknownExchangeTypeException();
- } else {
- exchange = i->second(name, durable, args, parent, broker);
+ Exchange::shared_ptr exchange;
+ std::pair<Exchange::shared_ptr, bool> result;
+ {
+ RWlock::ScopedWlock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i == exchanges.end()) {
+ if (type == TopicExchange::typeName){
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
+ }else if(type == DirectExchange::typeName){
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
+ }else if(type == FanOutExchange::typeName){
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
+ }else if (type == HeadersExchange::typeName) {
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementDirectExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementTopicExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
+ }else if (type == Link::exchangeTypeName) {
+ exchange = Link::linkExchangeFactory(name);
+ }else{
+ FunctionMap::iterator i = factory.find(type);
+ if (i == factory.end()) {
+ throw UnknownExchangeTypeException();
+ } else {
+ exchange = i->second(name, durable, args, parent, broker);
+ }
}
+ exchanges[name] = exchange;
+ result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ } else {
+ result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
- exchanges[name] = exchange;
- return std::pair<Exchange::shared_ptr, bool>(exchange, true);
- } else {
- return std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
+ if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
+ return result;
}
void ExchangeRegistry::destroy(const string& name){
@@ -82,12 +87,17 @@ void ExchangeRegistry::destroy(const string& name){
(name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
name == "qpid.management")
throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
- RWlock::ScopedWlock locker(lock);
- ExchangeMap::iterator i = exchanges.find(name);
- if (i != exchanges.end()) {
- i->second->destroy();
- exchanges.erase(i);
+ Exchange::shared_ptr exchange;
+ {
+ RWlock::ScopedWlock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i != exchanges.end()) {
+ exchange = i->second;
+ i->second->destroy();
+ exchanges.erase(i);
+ }
}
+ if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
}
Exchange::shared_ptr ExchangeRegistry::find(const string& name){
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 2bce99b6fe..56c894c129 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -24,6 +24,9 @@
#include <algorithm>
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 6648ae0422..9975d26c72 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -26,6 +26,9 @@
using namespace qpid::broker;
+
+using std::string;
+
using namespace qpid::framing;
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index f21c861149..84dd163ac3 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -125,18 +125,20 @@ boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name
return Exchange::shared_ptr(new LinkExchange(_name));
}
-Link::Link(LinkRegistry* _links,
- MessageStore* _store,
+Link::Link(const string& _name,
+ LinkRegistry* _links,
const string& _host,
uint16_t _port,
const string& _transport,
+ DestroyedListener l,
bool _durable,
const string& _authMechanism,
const string& _username,
const string& _password,
Broker* _broker,
- Manageable* parent)
- : links(_links), store(_store),
+ Manageable* parent,
+ bool failover_)
+ : name(_name), links(_links),
configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
host(_host), port(_port), transport(_transport),
durable(_durable),
@@ -149,7 +151,9 @@ Link::Link(LinkRegistry* _links,
channelCounter(1),
connection(0),
agent(0),
+ listener(l),
timerTask(new LinkTimerTask(*this, broker->getTimer())),
+ failover(failover_),
failoverChannel(0)
{
if (parent != 0 && broker != 0)
@@ -157,7 +161,10 @@ Link::Link(LinkRegistry* _links,
agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
+ mgmtObject = new _qmf::Link(agent, this, parent, name, durable);
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
agent->addObject(mgmtObject, 0, durable);
}
}
@@ -169,13 +176,15 @@ Link::Link(LinkRegistry* _links,
}
broker->getTimer().add(timerTask);
- stringstream _name;
- _name << "qpid.link." << transport << ":" << host << ":" << port;
- std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
- exchangeTypeName);
- failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
- assert(failoverExchange);
- failoverExchange->setLink(this);
+ if (failover) {
+ stringstream exchangeName;
+ exchangeName << "qpid.link." << name;
+ std::pair<Exchange::shared_ptr, bool> rc =
+ broker->getExchanges().declare(exchangeName.str(), exchangeTypeName);
+ failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(failoverExchange);
+ failoverExchange->setLink(this);
+ }
}
Link::~Link ()
@@ -187,7 +196,8 @@ Link::~Link ()
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
- broker->getExchanges().destroy(failoverExchange->getName());
+ if (failover)
+ broker->getExchanges().destroy(failoverExchange->getName());
}
void Link::setStateLH (int newState)
@@ -239,16 +249,19 @@ void Link::established(Connection* c)
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
-
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- connection = c;
- if (closing)
+ bool isClosing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ isClosing = closing;
+ }
+ if (isClosing)
destroy();
else // Process any IO tasks bridges added before established.
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -261,16 +274,26 @@ void Link::setUrl(const Url& u) {
namespace {
- /** invoked when session used to subscribe to remote's amq.failover exchange detaches */
- void sessionDetached(Link *link) {
- QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName());
- }
+class DetachedCallback : public SessionHandler::ErrorListener {
+ public:
+ DetachedCallback(const Link& link) : name(link.getName()) {}
+ void connectionException(framing::connection::CloseCode, const std::string&) {}
+ void channelException(framing::session::DetachCode, const std::string&) {}
+ void executionException(framing::execution::ErrorCode, const std::string&) {}
+ void detach() {}
+ private:
+ const std::string name;
+};
}
-
void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
+
+ if (!hideManagement() && connection->GetManagementObject()) {
+ mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+ }
+
// Get default URL from known-hosts if not already set
if (url.empty()) {
const std::vector<Url>& known = connection->getKnownHosts();
@@ -282,80 +305,82 @@ void Link::opened() {
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
- //
- // attempt to subscribe to failover exchange for updates from remote
- //
-
- const std::string queueName = "qpid.link." + framing::Uuid(true).str();
- failoverChannel = nextChannel();
-
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
- failoverSession = queueName;
- sessionHandler.attachAs(failoverSession);
-
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
-
- remoteBroker.getQueue().declare(queueName,
- "", // alt-exchange
- false, // passive
- false, // durable
- true, // exclusive
- true, // auto-delete
- FieldTable());
- remoteBroker.getExchange().bind(queueName,
- FAILOVER_EXCHANGE,
- "", // no key
- FieldTable());
- remoteBroker.getMessage().subscribe(queueName,
- failoverExchange->getName(),
- 1, // implied-accept mode
- 0, // pre-acquire mode
- false, // exclusive
- "", // resume-id
- 0, // resume-ttl
+ if (failover) {
+ //
+ // attempt to subscribe to failover exchange for updates from remote
+ //
+
+ const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ failoverChannel = nextChannel();
+
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ sessionHandler.setErrorListener(
+ boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
+ failoverSession = queueName;
+ sessionHandler.attachAs(failoverSession);
+
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+ remoteBroker.getQueue().declare(queueName,
+ "", // alt-exchange
+ false, // passive
+ false, // durable
+ true, // exclusive
+ true, // auto-delete
+ FieldTable());
+ remoteBroker.getExchange().bind(queueName,
+ FAILOVER_EXCHANGE,
+ "", // no key
FieldTable());
- remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
- remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ remoteBroker.getMessage().subscribe(queueName,
+ failoverExchange->getName(),
+ 1, // implied-accept mode
+ 0, // pre-acquire mode
+ false, // exclusive
+ "", // resume-id
+ 0, // resume-ttl
+ FieldTable());
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ }
}
void Link::closed(int, std::string text)
{
- bool isClosing = false;
- {
- Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
+ Mutex::ScopedLock mutex(lock);
+ QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
- connection = 0;
- if (state == STATE_OPERATIONAL) {
+ connection = 0;
+
+ if (!hideManagement()) {
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
+ if (state == STATE_OPERATIONAL && agent) {
stringstream addr;
addr << host << ":" << port;
- if (!hideManagement() && agent)
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
+ }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->closed();
- created.push_back(*i);
- }
- active.clear();
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ created.push_back(*i);
+ }
+ active.clear();
- if (state != STATE_FAILED && state != STATE_PASSIVE)
- {
- setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (text);
- }
+ if (state != STATE_FAILED && state != STATE_PASSIVE)
+ {
+ setStateLH(STATE_WAITING);
+ if (!hideManagement())
+ mgmtObject->set_lastError (text);
}
- // Call destroy outside of the lock, don't want to be deleted with lock held.
- if (isClosing)
- destroy();
}
-// Called in connection IO thread.
+// Called in connection IO thread, cleans up the connection before destroying Link
void Link::destroy ()
{
Bridges toDelete;
+
+ timerTask->cancel(); // call prior to locking so maintenance visit can finish
{
Mutex::ScopedLock mutex(lock);
@@ -374,14 +399,13 @@ void Link::destroy ()
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
-
- timerTask->cancel();
}
+
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
+ (*i)->close();
toDelete.clear();
- links->destroy (configuredHost, configuredPort);
+ listener(this); // notify LinkRegistry that this Link has been destroyed
}
void Link::add(Bridge::shared_ptr bridge)
@@ -423,7 +447,7 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
- if (state != STATE_OPERATIONAL)
+ if (state != STATE_OPERATIONAL || closing)
return;
// check for bridge session errors and recover
@@ -460,7 +484,7 @@ void Link::ioThreadProcessing()
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
-
+ if (closing) return;
if (state == STATE_WAITING)
{
visitCount++;
@@ -476,21 +500,27 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
+ else if (state == STATE_OPERATIONAL &&
+ (!active.empty() || !created.empty() || !cancellations.empty()) &&
+ connection && connection->isOpen())
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
+}
void Link::reconnectLH(const Address& a)
{
host = a.host;
port = a.port;
transport = a.protocol;
- startConnectionLH();
+
if (!hideManagement()) {
stringstream errorString;
- errorString << "Failed over to " << a;
+ errorString << "Failing over to " << a;
mgmtObject->set_lastError(errorString.str());
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
}
+ startConnectionLH();
}
bool Link::tryFailoverLH() {
@@ -499,15 +529,14 @@ bool Link::tryFailoverLH() {
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
- links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next);
reconnectLH(next);
return true;
}
return false;
}
-// Management updates for a linke are inconsistent in a cluster, so they are
+// Management updates for a link are inconsistent in a cluster, so they are
// suppressed.
bool Link::hideManagement() const {
return !mgmtObject || ( broker && broker->isInCluster());
@@ -536,18 +565,34 @@ void Link::setPersistenceId(uint64_t id) const
const string& Link::getName() const
{
- return configuredHost;
+ return name;
+}
+
+const std::string Link::ENCODED_IDENTIFIER("link.v2");
+const std::string Link::ENCODED_IDENTIFIER_V1("link");
+
+bool Link::isEncodedLink(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string transport;
string authMechanism;
string username;
string password;
+ string name;
+ if (kind == ENCODED_IDENTIFIER) {
+ // newer version provides a link name.
+ buffer.getShortString(name);
+ }
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -556,12 +601,21 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(username);
buffer.getShortString(password);
- return links.declare(host, port, transport, durable, authMechanism, username, password).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the Link by host:port, there was no name
+ * assigned. So create a name for the new Link.
+ */
+ name = createName(transport, host, port);
+ }
+
+ return links.declare(name, host, port, transport, durable, authMechanism,
+ username, password).first;
}
void Link::encode(Buffer& buffer) const
{
- buffer.putShortString(string("link"));
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
buffer.putShortString(configuredHost);
buffer.putShort(configuredPort);
buffer.putShortString(configuredTransport);
@@ -573,8 +627,9 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return configuredHost.size() + 1 // short-string (host)
- + 5 // short-string ("link")
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + configuredHost.size() + 1 // short-string (host)
+ 2 // port
+ configuredTransport.size() + 1 // short-string(transport)
+ 1 // durable
@@ -589,6 +644,7 @@ ManagementObject* Link::GetManagementObject (void) const
}
void Link::close() {
+ QPID_LOG(debug, "Link::close(), link=" << name );
Mutex::ScopedLock mutex(lock);
if (!closing) {
closing = true;
@@ -609,36 +665,31 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
+ /* TBD: deprecate this interface in favor of the Broker::create() method. The
+ * Broker::create() method allows the user to assign a name to the bridge.
+ */
+ QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='bridge' instead.");
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
- QPID_LOG(debug, "Link::bridge() request received");
-
- // Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable) {
- text = "Can't create a durable route on a non-durable link";
- return Manageable::STATUS_USER;
- }
-
- if (iargs.i_dynamic) {
- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
- if (exchange.get() == 0) {
- text = "Exchange not found";
- return Manageable::STATUS_USER;
- }
- if (!exchange->supportsDynamicBinding()) {
- text = "Exchange type does not support dynamic routing";
- return Manageable::STATUS_USER;
+ QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
+ "; dest=" << iargs.i_dest << "; key=" << iargs.i_key);
+
+ // Does a bridge already exist that has the src/dest/key? If so, re-use the
+ // existing bridge - this behavior is backward compatible with previous releases.
+ Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key);
+ if (!bridge) {
+ // need to create a new bridge on this link.
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key),
+ *this, iargs.i_durable,
+ iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic, iargs.i_sync);
+ if (!rc.first) {
+ text = "invalid parameters";
+ return Manageable::STATUS_PARAMETER_INVALID;
}
}
-
- std::pair<Bridge::shared_ptr, bool> result =
- links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic, iargs.i_sync);
-
- if (result.second && iargs.i_durable)
- store->create(*result.first);
-
return Manageable::STATUS_OK;
}
@@ -666,11 +717,13 @@ void Link::closeConnection( const std::string& reason)
{
if (connection != 0) {
// cancel our subscription to the failover exchange
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- if (sessionHandler.getSession()) {
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
- remoteBroker.getMessage().cancel(failoverExchange->getName());
- remoteBroker.getSession().detach(failoverSession);
+ if (failover) {
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ if (sessionHandler.getSession()) {
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+ remoteBroker.getMessage().cancel(failoverExchange->getName());
+ remoteBroker.getSession().detach(failoverSession);
+ }
}
connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
connection = 0;
@@ -716,6 +769,23 @@ void Link::setState(const framing::FieldTable& state)
}
}
+std::string Link::createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port)
+{
+ stringstream linkName;
+ linkName << QPID_NAME_PREFIX << transport << std::string(":")
+ << host << std::string(":") << port;
+ return linkName.str();
+}
+
+
+bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
+{
+ Mutex::ScopedLock mutex(lock);
+ return (isConnecting() && _port == port && _host == host);
+}
+
const std::string Link::exchangeTypeName("qpid.LinkExchange");
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index a97fa48664..f0cb90e73b 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -25,7 +25,6 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
@@ -52,8 +51,8 @@ class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
mutable sys::Mutex lock;
+ const std::string name;
LinkRegistry* links;
- MessageStore* store;
// these remain constant across failover - used to identify this link
const std::string configuredTransport;
@@ -64,7 +63,8 @@ class Link : public PersistableConfig, public management::Manageable {
uint16_t port;
std::string transport;
- bool durable;
+ bool durable;
+
std::string authMechanism;
std::string username;
std::string password;
@@ -85,8 +85,10 @@ class Link : public PersistableConfig, public management::Manageable {
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
+ boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
+ bool failover; // Do we subscribe to a failover exchange?
uint failoverChannel;
std::string failoverSession;
@@ -101,33 +103,39 @@ class Link : public PersistableConfig, public management::Manageable {
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
- void destroy(); // Called when mgmt deletes this link
+ void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
bool hideManagement() const;
+ void reconnectLH(const Address&); //called by LinkRegistry
- void established(Connection*); // Called when connection is create
+ // connection management (called by LinkRegistry)
+ void established(Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
- void reconnectLH(const Address&); //called by LinkRegistry
+ void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
+ bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote?
friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;
+ typedef boost::function<void(Link*)> DestroyedListener;
- Link(LinkRegistry* links,
- MessageStore* store,
+ Link(const std::string& name,
+ LinkRegistry* links,
const std::string& host,
uint16_t port,
const std::string& transport,
+ DestroyedListener l,
bool durable,
const std::string& authMechanism,
const std::string& username,
const std::string& password,
Broker* broker,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0,
+ bool failover=true);
virtual ~Link();
/** these return the *configured* transport/host/port, which does not change over the
@@ -139,7 +147,7 @@ class Link : public PersistableConfig, public management::Manageable {
/** returns the current address of the remote, which may be different from the
configured transport/host/port due to failover. Returns true if connection is
active */
- bool getRemoteAddress(qpid::Address& addr) const;
+ QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const;
bool isDurable() { return durable; }
void maintenanceVisit ();
@@ -148,15 +156,17 @@ class Link : public PersistableConfig, public management::Manageable {
void cancel(Bridge::shared_ptr);
QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
- QPID_BROKER_EXTERN void close(); // Close the link from within the broker.
+
+ // Close the link.
+ QPID_BROKER_EXTERN void close();
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
- void notifyConnectionForced(const std::string text);
void setPassive(bool p);
+ bool isConnecting() const { return state == STATE_CONNECTING; }
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
@@ -165,7 +175,10 @@ class Link : public PersistableConfig, public management::Manageable {
void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedLink(const std::string& key);
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
@@ -178,6 +191,16 @@ class Link : public PersistableConfig, public management::Manageable {
// replicate internal state of this Link for clustering
void getState(framing::FieldTable& state) const;
void setState(const framing::FieldTable& state);
+
+ /** create a name for a link (if none supplied by user config) */
+ static std::string createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port);
+
+ /** The current connction for this link. Note returns 0 if the link is not
+ * presently connected.
+ */
+ Connection* getConnection() { return connection; }
};
}
}
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index d89f220d1b..0507fe6521 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -68,54 +68,92 @@ LinkRegistry::LinkRegistry (Broker* _broker) :
LinkRegistry::~LinkRegistry() {}
+/** find link by the *configured* remote address */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host,
+ uint16_t port,
+ const std::string& transport)
+{
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) {
+ Link::shared_ptr& link = i->second;
+ if (link->getHost() == host &&
+ link->getPort() == port &&
+ (transport.empty() || link->getTransport() == transport))
+ return link;
+ }
+ return boost::shared_ptr<Link>();
+}
-void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
+/** find link by name */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name)
{
Mutex::ScopedLock locker(lock);
- std::string oldKey = createKey(oldAddress);
- std::string newKey = createKey(newAddress);
- if (links.find(newKey) != links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- } else {
- LinkMap::iterator i = links.find(oldKey);
- if (i == links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- } else {
- links[newKey] = i->second;
- links.erase(oldKey);
- QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- }
- }
+ LinkMap::iterator l = links.find(name);
+ if (l != links.end())
+ return l->second;
+ return boost::shared_ptr<Link>();
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
+ const string& host,
uint16_t port,
const string& transport,
bool durable,
const string& authMechanism,
const string& username,
- const string& password)
+ const string& password,
+ bool failover)
{
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(name);
if (i == links.end())
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
- authMechanism, username, password,
- broker, parent));
- links[key] = link;
+ link = Link::shared_ptr (
+ new Link (name, this, host, port, transport,
+ boost::bind(&LinkRegistry::linkDestroyed, this, _1),
+ durable, authMechanism, username, password, broker,
+ parent, failover));
+ if (durable && store) store->create(*link);
+ links[name] = link;
+ QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
- uint16_t port,
+/** find bridge by link & route info */
+Bridge::shared_ptr LinkRegistry::getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) {
+ if (i->second->getSrc() == src && i->second->getDest() == dest &&
+ i->second->getKey() == key && i->second->getLink() &&
+ i->second->getLink()->getName() == link.getName()) {
+ return i->second;
+ }
+ }
+ return Bridge::shared_ptr();
+}
+
+/** find bridge by name */
+Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name)
+{
+ Mutex::ScopedLock locker(lock);
+ BridgeMap::iterator b = bridges.find(name);
+ if (b != bridges.end())
+ return b->second;
+ return Bridge::shared_ptr();
+}
+
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -126,22 +164,32 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
const std::string& excludes,
bool dynamic,
uint16_t sync,
- Bridge::InitializeCallback init
+ Bridge::InitializeCallback init,
+ const std::string& queueName,
+ const std::string& altExchange
)
{
Mutex::ScopedLock locker(lock);
- QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
+ // Durable bridges are only valid on durable links
+ if (durable && !link.isDurable()) {
+ QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName());
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ if (dynamic) {
+ Exchange::shared_ptr exchange = broker->getExchanges().get(src);
+ if (exchange.get() == 0) {
+ QPID_LOG(error, "Exchange not found, name='" << src << "'" );
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'");
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ }
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(name);
if (b == bridges.end())
{
_qmf::ArgsLinkBridge args;
@@ -159,23 +207,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
args.i_sync = sync;
bridge = Bridge::shared_ptr
- (new Bridge (l->second.get(), l->second->nextChannel(),
- boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key),
- args, init));
- bridges[bridgeKey] = bridge;
- l->second->add(bridge);
+ (new Bridge (name, &link, link.nextChannel(),
+ boost::bind(&LinkRegistry::destroyBridge, this, _1),
+ args, init, queueName, altExchange));
+ bridges[name] = bridge;
+ link.add(bridge);
+ if (durable && store)
+ store->create(*bridge);
+
+ QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
+ "' from " << src << " to " << dest << " (" << key << ")");
+
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
}
return std::pair<Bridge::shared_ptr, bool>(b->second, false);
}
-void LinkRegistry::destroy(const string& host, const uint16_t port)
+/** called back by the link when it has completed its cleanup and can be removed. */
+void LinkRegistry::linkDestroyed(Link *link)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
if (i->second->isDurable() && store)
@@ -184,27 +238,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
}
}
-void LinkRegistry::destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key)
+/** called back by bridge when its destruction has been requested */
+void LinkRegistry::destroyBridge(Bridge *bridge)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName());
Mutex::ScopedLock locker(lock);
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
-
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return;
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(bridge->getName());
if (b == bridges.end())
return;
- l->second->cancel(b->second);
+ Link *link = b->second->getLink();
+ if (link) {
+ link->cancel(b->second);
+ }
if (b->second->isDurable())
store->destroy(*(b->second));
bridges.erase(b);
@@ -219,26 +266,71 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
-Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
-{
- // Convert keyOrMgmtId to a host:port key.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses
- // connection management IDs. Currently sys:: protocol factories
- // and IO plugins construct the IDs and LinkRegistry parses them.
- size_t separator = keyOrMgmtId.find('-');
- if (separator == std::string::npos) separator = 0;
- std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
+namespace {
+ void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
+ {
+ // Extract host and port of remote broker from connection id string.
+ //
+ // TODO aconway 2011-02-01: centralize code that constructs/parses connection
+ // management IDs. Currently sys:: protocol factories and IO plugins construct the
+ // IDs and LinkRegistry parses them.
+ // KAG: current connection id format assumed:
+ // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are
+ // contained within brackets "[...]", example:
+ // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
+ // if this assumption changes!
+ size_t separator = connId.find('-');
+ assert(separator != std::string::npos);
+ std::string remote = connId.substr(separator+1, std::string::npos);
+ separator = remote.rfind(":");
+ assert(separator != std::string::npos);
+ *host = remote.substr(0, separator);
+ // IPv6 - host is bracketed by "[]", strip them
+ if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
+ *host = host->substr(1, host->length() - 2);
+ }
+ try {
+ *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
+ } catch (const boost::bad_lexical_cast&) {
+ QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
+ assert(false);
+ }
+ }
+}
+/** find the Link that corresponds to the given connection */
+Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
+{
Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end()) return l->second;
- else return Link::shared_ptr();
+ ConnectionMap::iterator c = connections.find(connId);
+ if (c != connections.end()) {
+ LinkMap::iterator l = links.find(c->second);
+ if (l != links.end())
+ return l->second;
+ }
+ return Link::shared_ptr();
}
void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
- Link::shared_ptr link = findLink(key);
+ // find a link that is attempting to connect to the remote, and
+ // create a mapping from connection id to link
+ QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
+ std::string host;
+ uint16_t port = 0;
+ extractHostPort( key, &host, &port );
+ Link::shared_ptr link;
+ {
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ if (l->second->pendingConnection(host, port)) {
+ link = l->second;
+ connections[key] = link->getName();
+ break;
+ }
+ }
+ }
+
if (link) {
link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
@@ -343,20 +435,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key)
}
-std::string LinkRegistry::createKey(const qpid::Address& a) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- return createKey(a.host, a.port);
-}
-
-std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- stringstream keystream;
- keystream << host << ":" << port;
- return keystream.str();
-}
-
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
@@ -369,10 +447,12 @@ void LinkRegistry::setPassive(bool p)
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ Mutex::ScopedLock locker(lock);
for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
}
void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ Mutex::ScopedLock locker(lock);
for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
}
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 8e9d2f4b0d..5a39b62bd1 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -42,9 +42,11 @@ namespace broker {
class LinkRegistry {
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+ typedef std::map<std::string, std::string> ConnectionMap;
- LinkMap links;
- BridgeMap bridges;
+ LinkMap links; /** indexed by name of Link */
+ BridgeMap bridges; /** indexed by name of Bridge */
+ ConnectionMap connections; /** indexed by connection identifier, gives link name */
qpid::sys::Mutex lock;
Broker* broker;
@@ -54,15 +56,18 @@ namespace broker {
std::string realm;
boost::shared_ptr<Link> findLink(const std::string& key);
- static std::string createKey(const Address& address);
- static std::string createKey(const std::string& host, uint16_t port);
- // Methods called by the connection observer.
+ // Methods called by the connection observer, key is connection identifier
void notifyConnection (const std::string& key, Connection* c);
void notifyOpened (const std::string& key);
void notifyClosed (const std::string& key);
void notifyConnectionForced (const std::string& key, const std::string& text);
- friend class LinkRegistryConnectionObserver;
+ friend class LinkRegistryConnectionObserver;
+
+ /** Notify the registry that a Link has been destroyed */
+ void linkDestroyed(Link*);
+ /** Request to destroy a Bridge */
+ void destroyBridge(Bridge*);
public:
QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests
@@ -70,17 +75,29 @@ namespace broker {
QPID_BROKER_EXTERN ~LinkRegistry();
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool>
- declare(const std::string& host,
+ declare(const std::string& name,
+ const std::string& host,
uint16_t port,
const std::string& transport,
bool durable,
const std::string& authMechanism,
const std::string& username,
- const std::string& password);
+ const std::string& password,
+ bool failover=true);
+
+ /** determine if Link exists */
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& name);
+ /** host,port,transport will be matched against the configured values, which may
+ be different from the current values due to failover */
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& configHost,
+ uint16_t configPort,
+ const std::string& configTransport = std::string());
QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool>
- declare(const std::string& host,
- uint16_t port,
+ declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -91,16 +108,18 @@ namespace broker {
const std::string& excludes,
bool dynamic,
uint16_t sync,
- Bridge::InitializeCallback=0
+ Bridge::InitializeCallback=0,
+ const std::string& queueName="",
+ const std::string& altExchange=""
);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key);
+ /** determine if Bridge exists */
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const std::string& name);
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
/**
* Register the manageable parent for declared queues
@@ -126,11 +145,6 @@ namespace broker {
QPID_BROKER_EXTERN uint16_t getPort (const std::string& key);
/**
- * Called by links failing over to new address
- */
- void changeAddress(const Address& oldAddress, const Address& newAddress);
-
- /**
* Called to alter passive state. In passive state the links
* and bridges managed by a link registry will be recorded and
* updated but links won't actually establish connections and
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 40dfba39f4..4dd8a349dd 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -384,6 +384,18 @@ void Message::addTraceId(const std::string& id)
}
}
+void Message::clearTrace()
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (isA<MessageTransferBody>()) {
+ FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
+ std::string trace = headers.getAsString(X_QPID_TRACE);
+ if (!trace.empty()) {
+ headers.setString(X_QPID_TRACE, "");
+ }
+ }
+}
+
void Message::setTimestamp()
{
sys::Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index dda45d73e6..90e4eec889 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -161,6 +161,7 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
+ void clearTrace();
void forcePersistent();
bool isForcedPersistent();
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp
index f70c996975..83c8ca6868 100644
--- a/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/cpp/src/qpid/broker/MessageDeque.cpp
@@ -40,13 +40,16 @@ size_t MessageDeque::index(const framing::SequenceNumber& position)
bool MessageDeque::deleted(const QueuedMessage& m)
{
size_t i = index(m.position);
- if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
- messages[i].status = QueuedMessage::DELETED;
- clean();
- return true;
- } else {
- return false;
+ if (i < messages.size()) {
+ QueuedMessage *qm = &messages[i];
+ if (qm->status != QueuedMessage::DELETED) {
+ qm->status = QueuedMessage::DELETED;
+ qm->payload = 0; // message no longer needed
+ clean();
+ return true;
+ }
}
+ return false;
}
size_t MessageDeque::size()
@@ -144,6 +147,7 @@ QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
messages.back().status = QueuedMessage::AVAILABLE;
if (head >= messages.size()) head = messages.size() - 1;
++available;
+ clean(); // QPID-4046: let producer help clean the backlog of deleted messages
return &messages.back();
}
@@ -173,12 +177,37 @@ void MessageDeque::updateAcquired(const QueuedMessage& acquired)
}
}
+namespace {
+bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
+} // namespace
+
+void MessageDeque::setPosition(const framing::SequenceNumber& n) {
+ size_t i = index(n+1);
+ if (i >= messages.size()) return; // Nothing to do.
+
+ // Assertion to verify the precondition: no messaages after n.
+ assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
+ messages.end());
+ messages.erase(messages.begin()+i, messages.end());
+ if (head >= messages.size()) head = messages.size() - 1;
+ // Re-count the available messages
+ available = 0;
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE) ++available;
+ }
+}
+
void MessageDeque::clean()
{
- while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
+ // QPID-4046: If a queue has multiple consumers, then it is possible for a large
+ // collection of deleted messages to build up. Limit the number of messages cleaned
+ // up on each call to clean().
+ size_t count = 0;
+ while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) {
messages.pop_front();
- if (head) --head;
+ count += 1;
}
+ head = (head > count) ? head - count : 0;
}
void MessageDeque::foreach(Functor f)
diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h
index 9b53716d4e..c5670b2a72 100644
--- a/cpp/src/qpid/broker/MessageDeque.h
+++ b/cpp/src/qpid/broker/MessageDeque.h
@@ -44,7 +44,7 @@ class MessageDeque : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
-
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp
index 9b164d4e5c..592f3fefde 100644
--- a/cpp/src/qpid/broker/MessageMap.cpp
+++ b/cpp/src/qpid/broker/MessageMap.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
QueuedMessage& a = messages[added.position];
a = added;
a.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Added message at " << a.position);
+ QPID_LOG(debug, "Added message " << a);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
result.first->second.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
+ QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
return true;
}
}
+void MessageMap::setPosition(const framing::SequenceNumber& seq) {
+ // Nothing to do, just assert that the precondition is respected and there
+ // are no undeleted messages after seq.
+ (void) seq; assert(messages.empty() || (--messages.end())->first <= seq);
+}
+
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h
index a668450250..1f0481cb6b 100644
--- a/cpp/src/qpid/broker/MessageMap.h
+++ b/cpp/src/qpid/broker/MessageMap.h
@@ -6,7 +6,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+o * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -50,6 +50,7 @@ class MessageMap : public Messages
virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool consume(QueuedMessage&);
virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
virtual void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h
index 61e9fa110a..45f5e6cd81 100644
--- a/cpp/src/qpid/broker/Messages.h
+++ b/cpp/src/qpid/broker/Messages.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
namespace qpid {
@@ -101,14 +102,22 @@ class Messages
virtual void updateAcquired(const QueuedMessage&) { }
/**
+ * Set the position of the back of the queue. Next message enqueued will be n+1.
+ *@pre Any messages with seq > n must already be dequeued.
+ */
+ virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
+
+ /**
* Apply, the functor to each message held
*/
+
virtual void foreach(Functor) = 0;
/**
* Remove every message held that for which the specified
* predicate returns true
*/
virtual void removeIf(Predicate) = 0;
+
private:
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/NameGenerator.h b/cpp/src/qpid/broker/NameGenerator.h
index 6ea25c9797..2e9f7febe2 100644
--- a/cpp/src/qpid/broker/NameGenerator.h
+++ b/cpp/src/qpid/broker/NameGenerator.h
@@ -32,6 +32,7 @@ namespace qpid {
NameGenerator(const std::string& base);
std::string generate();
};
+ const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names
}
}
diff --git a/cpp/src/qpid/broker/Observers.h b/cpp/src/qpid/broker/Observers.h
new file mode 100644
index 0000000000..c62f75d6d0
--- /dev/null
+++ b/cpp/src/qpid/broker/Observers.h
@@ -0,0 +1,69 @@
+#ifndef QPID_BROKER_OBSERVERS_H
+#define QPID_BROKER_OBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+#include <algorithm>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for collections of observers with thread-safe add/remove and traversal.
+ */
+template <class Observer>
+class Observers
+{
+ public:
+ void add(boost::shared_ptr<Observer> observer) {
+ sys::Mutex::ScopedLock l(lock);
+ observers.push_back(observer);
+ }
+
+ void remove(boost::shared_ptr<Observer> observer) {
+ sys::Mutex::ScopedLock l(lock);
+ typename List::iterator i = std::find(observers.begin(), observers.end(), observer);
+ observers.erase(i);
+ }
+
+ protected:
+ typedef std::vector<boost::shared_ptr<Observer> > List;
+
+ sys::Mutex lock;
+ List observers;
+
+ template <class F> void each(F f) {
+ List copy;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ copy = observers;
+ }
+ std::for_each(copy.begin(), copy.end(), f);
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_OBSERVERS_H*/
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp
index ab5ec7235a..9a0fead744 100644
--- a/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
fifo.updateAcquired(acquired);
}
+void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
+ fifo.setPosition(n);
+}
+
void PriorityQueue::foreach(Functor f)
{
fifo.foreach(f);
diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h
index 8628745db1..301367358b 100644
--- a/cpp/src/qpid/broker/PriorityQueue.h
+++ b/cpp/src/qpid/broker/PriorityQueue.h
@@ -52,6 +52,7 @@ class PriorityQueue : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/PrivateImplRef.h b/cpp/src/qpid/broker/PrivateImplRef.h
index 5932ab882b..d20c2f4608 100644
--- a/cpp/src/qpid/broker/PrivateImplRef.h
+++ b/cpp/src/qpid/broker/PrivateImplRef.h
@@ -88,15 +88,15 @@ template <class T> class PrivateImplRef {
/** Set the implementation pointer in a handle */
static void set(T& t, const intrusive_ptr& p) {
if (t.impl == p) return;
- if (t.impl) boost::intrusive_ptr_release(t.impl);
+ if (t.impl) intrusive_ptr_release(t.impl);
t.impl = p.get();
- if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+ if (t.impl) intrusive_ptr_add_ref(t.impl);
}
// Helper functions to implement the ctor, dtor, copy, assign
- static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+ static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); }
static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
- static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+ static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); }
static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
};
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e7305c021d..d5267c78dc 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -49,6 +49,7 @@
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <iostream>
#include <algorithm>
@@ -67,6 +68,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using std::string;
using std::for_each;
using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -176,7 +178,8 @@ Queue::Queue(const string& _name, bool _autodelete,
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete);
+ mgmtObject->set_exclusive(_owner != 0);
agent->addObject(mgmtObject, 0, store != 0);
brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
if (brokerMgmtObject)
@@ -587,21 +590,51 @@ QueuedMessage Queue::get(){
return msg;
}
-bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+namespace {
+bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
+ std::deque<QueuedMessage>& collection)
{
- if (message.payload->hasExpired()) {
- expired.push_back(message);
+ if (predicate(qm)) {
+ collection.push_back(qm);
return true;
} else {
return false;
}
}
+bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
+} // namespace
+
+void Queue::dequeueIf(Messages::Predicate predicate,
+ std::deque<QueuedMessage>& dequeued)
+{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
+ }
+ if (!dequeued.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires(dequeued.size());
+ }
+ for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
+ i != dequeued.end(); ++i) {
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf() call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
+ }
+ }
+}
+
/**
*@param lapse: time since the last purgeExpired
*/
-void Queue::purgeExpired(qpid::sys::Duration lapse)
-{
+void Queue::purgeExpired(sys::Duration lapse) {
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
@@ -609,37 +642,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- std::deque<QueuedMessage> expired;
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
- }
-
- if (!expired.empty()) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(boost::bind(&isExpired, _1), dequeued);
+ if (dequeued.size()) {
if (mgmtObject) {
- mgmtObject->inc_acquires(expired.size());
- mgmtObject->inc_discardsTtl(expired.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(expired.size());
- brokerMgmtObject->inc_discardsTtl(expired.size());
- }
- }
-
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf() call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
+ mgmtObject->inc_discardsTtl(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(dequeued.size());
}
}
}
}
-
namespace {
// for use with purge/move below - collect messages that match a given filter
//
@@ -797,6 +811,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
// now reroute if necessary
if (dest.get()) {
assert(qmsg->payload);
+ qmsg->payload->clearTrace();
DeliverableMessage dmsg(qmsg->payload);
dest->routeWithAlternate(dmsg);
}
@@ -888,9 +903,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (mgmtObject) {
mgmtObject->inc_acquires();
mgmtObject->inc_discardsLvq();
- if (brokerMgmtObject)
+ if (brokerMgmtObject) {
brokerMgmtObject->inc_acquires();
brokerMgmtObject->inc_discardsLvq();
+ }
}
if (isRecovery) {
//can't issue new requests for the store until
@@ -1470,12 +1486,18 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
+
+ if (broker.getManagementAgent())
+ broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
+ QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+ << " user:" << userId
+ << " rhost:" << connectionId );
}
}
@@ -1483,9 +1505,11 @@ struct AutoDeleteTask : qpid::sys::TimerTask
{
Broker& broker;
Queue::shared_ptr queue;
+ std::string connectionId;
+ std::string userId;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {}
void fire()
{
@@ -1493,19 +1517,19 @@ struct AutoDeleteTask : qpid::sys::TimerTask
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
};
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
- queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
}
@@ -1659,13 +1683,28 @@ void Queue::query(qpid::types::Variant::Map& results) const
if (allocator) allocator->query(results);
}
+namespace {
+struct After {
+ framing::SequenceNumber seq;
+ After(framing::SequenceNumber s) : seq(s) {}
+ bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+};
+} // namespace
+
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
+ if (n < sequence) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(After(n), dequeued);
+ messages->setPosition(n);
+ }
sequence = n;
QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
+ Mutex::ScopedLock locker(messageLock);
return sequence;
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 9869a698c1..a31e0002ea 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -175,6 +175,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void configureImpl(const qpid::framing::FieldTable& settings);
void checkNotDeleted(const Consumer::shared_ptr& c);
void notifyDeleted();
+ void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
public:
@@ -343,7 +344,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* exclusive owner
*/
static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
- static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+ static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
@@ -375,12 +376,21 @@ class Queue : public boost::enable_shared_from_this<Queue>,
std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
}
- /** Set the position sequence number for the next message on the queue.
- * Must be >= the current sequence number.
- * Used by cluster to replicate queues.
+ /**
+ * Set the sequence number for the back of the queue, the
+ * next message enqueued will be pos+1.
+ * If pos > getPosition() this creates a gap in the sequence numbers.
+ * if pos < getPosition() the back of the queue is reset to pos,
+ *
+ * The _caller_ must ensure that any messages after pos have been dequeued.
+ *
+ * Used by HA/cluster code for queue replication.
*/
QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
- /** return current position sequence number for the next message on the queue.
+
+ /**
+ *@return sequence number for the back of the queue. The next message pushed
+ * will be at getPosition+1
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index f15bb45c01..14fe5f4022 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -75,8 +75,8 @@ namespace {
result = v->get<int64_t>();
QPID_LOG(debug, "Got integer value for " << key << ": " << result);
if (result >= 0) return result;
- } else if (v->convertsTo<string>()) {
- string s(v->get<string>());
+ } else if (v->convertsTo<std::string>()) {
+ std::string s(v->get<std::string>());
QPID_LOG(debug, "Got string value for " << key << ": " << s);
std::istringstream convert(s);
if (convert >> result && result >= 0) return result;
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index d5b4c1ae86..3978420f4e 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -133,8 +133,8 @@ T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue
result = v->get<T>();
QPID_LOG(debug, "Got integer value for " << key << ": " << result);
if (result >= 0) return result;
- } else if (v->convertsTo<string>()) {
- string s(v->get<string>());
+ } else if (v->convertsTo<std::string>()) {
+ std::string s(v->get<std::string>());
QPID_LOG(debug, "Got string value for " << key << ": " << s);
std::istringstream convert(s);
if (convert >> result && result >= 0 && convert.eof()) return result;
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 236d5ae34c..1401356444 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueEvents.h"
@@ -46,40 +47,49 @@ QueueRegistry::declare(const string& declareName, bool durable,
definition from persistente
record*/)
{
- RWlock::ScopedWlock locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
- QueueMap::iterator i = queues.find(name);
+ Queue::shared_ptr queue;
+ std::pair<Queue::shared_ptr, bool> result;
+ {
+ RWlock::ScopedWlock locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
- if (alternate) {
- queue->setAlternateExchange(alternate);//need to do this *before* create
- alternate->incAlternateUsers();
- }
- if (!recovering) {
- //apply settings & create persistent record if required
- queue->create(arguments);
+ if (i == queues.end()) {
+ queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+ if (alternate) {
+ queue->setAlternateExchange(alternate);//need to do this *before* create
+ alternate->incAlternateUsers();
+ }
+ if (!recovering) {
+ //apply settings & create persistent record if required
+ queue->create(arguments);
+ } else {
+ //i.e. recovering a queue for which we already have a persistent record
+ queue->configure(arguments);
+ }
+ queues[name] = queue;
+ if (lastNode) queue->setLastNodeFailure();
+ result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
- //i.e. recovering a queue for which we already have a persistent record
- queue->configure(arguments);
+ result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
- queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
-
- return std::pair<Queue::shared_ptr, bool>(queue, true);
- } else {
- return std::pair<Queue::shared_ptr, bool>(i->second, false);
}
+ if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+ return result;
}
-void QueueRegistry::destroyLH (const string& name){
- queues.erase(name);
-}
-
-void QueueRegistry::destroy (const string& name){
- RWlock::ScopedWlock locker(lock);
- destroyLH (name);
+void QueueRegistry::destroy(const string& name) {
+ Queue::shared_ptr q;
+ {
+ qpid::sys::RWlock::ScopedWlock locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i != queues.end()) {
+ Queue::shared_ptr q = i->second;
+ queues.erase(i);
+ }
+ }
+ if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index f724e6b10c..a354513c5f 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@ class QueueRegistry {
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
const std::string& name,
bool durable = false,
- bool autodelete = false,
+ bool autodelete = false,
const OwnershipToken* owner = 0,
boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
const qpid::framing::FieldTable& args = framing::FieldTable(),
@@ -82,9 +82,8 @@ class QueueRegistry {
QPID_BROKER_EXTERN void destroy(const std::string& name);
template <class Test> bool destroyIf(const std::string& name, Test test)
{
- qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
- destroyLH (name);
+ destroy(name);
return true;
} else {
return false;
@@ -127,13 +126,13 @@ class QueueRegistry {
for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
f(i->second);
}
-
+
/**
* Change queue mode when cluster size drops to 1 node, expands again
* in practice allows flow queue to disk when last name to be exectuted
*/
void updateQueueClusterState(bool lastNode);
-
+
private:
typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
QueueMap queues;
@@ -144,12 +143,9 @@ private:
management::Manageable* parent;
bool lastNode; //used to set mode on queue declare
Broker* broker;
-
- //destroy impl that assumes lock is already held:
- void destroyLH (const std::string& name);
};
-
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index d08409695e..858535637a 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const
RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
{
string kind;
-
+ uint32_t p = buffer.getPosition();
buffer.getShortString (kind);
- if (kind == "link")
+ buffer.setPosition(p);
+
+ if (Link::isEncodedLink(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
- else if (kind == "bridge")
+ else if (Bridge::isEncodedBridge(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 80fa5e1c0e..2d7c820b63 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
# include "config.h"
#endif
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
@@ -37,6 +38,7 @@
using qpid::sys::cyrus::CyrusSecurityLayer;
#endif
+using std::string;
using namespace qpid::framing;
using qpid::sys::SecurityLayer;
using qpid::sys::SecuritySettings;
@@ -164,13 +166,17 @@ void SaslAuthenticator::fini(void)
#endif
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool isShadow )
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c )
{
if (c.getBroker().getOptions().auth) {
- if ( isShadow )
- return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
- else
- return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+ // The cluster creates non-authenticated connections for internal shadow connections
+ // that are never connected to an external client.
+ if ( !c.isAuthenticated() )
+ return std::auto_ptr<SaslAuthenticator>(
+ new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+ else
+ return std::auto_ptr<SaslAuthenticator>(
+ new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
} else {
QPID_LOG(debug, "SASL: No Authentication Performed");
return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
@@ -178,7 +184,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti
}
-NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
+NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
realm(c.getBroker().getOptions().realm), encrypt(e) {}
NullAuthenticator::~NullAuthenticator() {}
@@ -214,7 +220,7 @@ void NullAuthenticator::start(const string& mechanism, const string* response)
} else if (i != string::npos) {
//authorization id is first null delimited field
uid = response->substr(0, i);
- }//else not a valid SASL PLAIN response, throw error?
+ }//else not a valid SASL PLAIN response, throw error?
if (!uid.empty()) {
//append realm if it has not already been added
i = uid.find(realm);
@@ -226,7 +232,12 @@ void NullAuthenticator::start(const string& mechanism, const string* response)
}
} else {
connection.setUserId("anonymous");
- }
+ }
+ AclModule* acl = connection.getBroker().getAcl();
+ if (acl && !acl->approveConnection(connection))
+ {
+ throw ConnectionForcedException("User connection denied by configured limit");
+ }
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
}
@@ -240,7 +251,7 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t)
#if HAVE_SASL
-CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
+CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
{
init();
@@ -271,17 +282,17 @@ void CyrusAuthenticator::init()
NULL, /* Callbacks */
0, /* Connection flags */
&sasl_conn);
-
+
if (SASL_OK != code) {
QPID_LOG(error, "SASL: Connection creation failed: [" << code << "] " << sasl_errdetail(sasl_conn));
-
+
// TODO: Change this to an exception signaling
// server error, when one is available
throw ConnectionForcedException("Unable to perform authentication");
}
sasl_security_properties_t secprops;
-
+
//TODO: should the actual SSF values be configurable here?
secprops.min_ssf = encrypt ? 10: 0;
secprops.max_ssf = 256;
@@ -319,14 +330,14 @@ void CyrusAuthenticator::init()
secprops.property_values = 0;
secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */
/*
- * The nodict flag restricts SASL authentication mechanisms
- * to those that are not susceptible to dictionary attacks.
- * They are:
+ * The nodict flag restricts SASL authentication mechanisms
+ * to those that are not susceptible to dictionary attacks.
+ * They are:
* SRP
* PASSDSS-3DES-1
* EXTERNAL
*/
- if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
+ if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops);
if (result != SASL_OK) {
throw framing::InternalErrorException(QPID_MSG("SASL error: " << result));
@@ -371,10 +382,10 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms)
"", separator, "",
&list, &list_len,
&count);
-
+
if (SASL_OK != code) {
QPID_LOG(info, "SASL: Mechanism listing failed: " << sasl_errdetail(sasl_conn));
-
+
// TODO: Change this to an exception signaling
// server error, when one is available
throw ConnectionForcedException("Mechanism listing failed");
@@ -382,17 +393,17 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms)
string mechanism;
unsigned int start;
unsigned int end;
-
+
QPID_LOG(info, "SASL: Mechanism list: " << list);
-
+
end = 0;
do {
start = end;
-
+
// Seek to end of next mechanism
while (end < list_len && separator[0] != list[end])
end++;
-
+
// Record the mechanism
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value(string(list, start, end - start))));
end++;
@@ -404,20 +415,20 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response)
{
const char *challenge;
unsigned int challenge_len;
-
+
// This should be at same debug level as mech list in getMechanisms().
QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
int code = sasl_server_start(sasl_conn,
mechanism.c_str(),
(response ? response->c_str() : 0), (response ? response->size() : 0),
&challenge, &challenge_len);
-
+
processAuthenticationStep(code, challenge, challenge_len);
qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
+ if ( cnxMgmt )
cnxMgmt->set_saslMechanism(mechanism);
}
-
+
void CyrusAuthenticator::step(const string& response)
{
const char *challenge;
@@ -439,10 +450,17 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen
// authentication failure, when one is available
throw ConnectionForcedException("Authenticated username unavailable");
}
- QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
connection.setUserId(uid);
+ AclModule* acl = connection.getBroker().getAcl();
+ if (acl && !acl->approveConnection(connection))
+ {
+ throw ConnectionForcedException("User connection denied by configured limit");
+ }
+
+ QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
+
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
} else if (SASL_CONTINUE == code) {
string challenge_str(challenge, challenge_len);
@@ -490,7 +508,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr
securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
}
qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
+ if ( cnxMgmt )
cnxMgmt->set_saslSsf(ssf);
return securityLayer;
}
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.h b/cpp/src/qpid/broker/SaslAuthenticator.h
index 4e5d43214c..e5ecc9f6ec 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.h
+++ b/cpp/src/qpid/broker/SaslAuthenticator.h
@@ -54,7 +54,7 @@ public:
static void init(const std::string& saslName, std::string const & saslConfigPath );
static void fini(void);
- static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow);
+ static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection);
virtual void callUserIdCallbacks() { }
};
diff --git a/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/cpp/src/qpid/broker/SecureConnectionFactory.cpp
index 754b443c22..757f6efc59 100644
--- a/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,11 +41,6 @@ SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
sys::ConnectionCodec*
SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new amqp_0_10::Connection(out, id, false));
@@ -71,5 +66,5 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
return sc.release();
}
-
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 64924bdd4c..9a84db547c 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
- closeComplete(false)
+ closeComplete(false),
+ connectionId(getSession().getConnection().getUrl())
{}
SemanticState::~SemanticState() {
@@ -142,6 +143,7 @@ bool SemanticState::cancel(const string& tag)
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, unacked.end());
+ getSession().setUnackedCount(unacked.size());
return true;
} else {
return false;
@@ -270,6 +272,7 @@ void SemanticState::checkDtxTimeout()
void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
+ getSession().setUnackedCount(unacked.size());
}
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -426,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(session.getBroker(), queue);
+ Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
}
}
c->cancel();
@@ -555,6 +558,7 @@ void SemanticState::recover(bool requeue)
//w.r.t id is lost
sort(unacked.begin(), unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::deliver(DeliveryRecord& msg, bool sync)
@@ -712,6 +716,7 @@ void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedeliver
DeliveryRecords::iterator removed =
remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, range.end);
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
@@ -723,6 +728,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
if (i->isRedundant()) i = unacked.erase(i);
else i++;
}
+ getSession().setUnackedCount(unacked.size());
}
bool SemanticState::ConsumerImpl::doOutput()
@@ -810,6 +816,7 @@ void SemanticState::accepted(const SequenceSet& commands) {
(TransactionContext*) 0)));
unacked.erase(removed, unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::completed(const SequenceSet& commands) {
@@ -819,6 +826,7 @@ void SemanticState::completed(const SequenceSet& commands) {
bind(&SemanticState::complete, this, _1)));
unacked.erase(removed, unacked.end());
requestDispatch();
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::attached()
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index e5e1d2da16..15928ce599 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -146,6 +146,8 @@ class SemanticState : private boost::noncopyable {
std::string getResumeId() const { return resumeId; };
const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
+ uint32_t getDeliveryCount() const { return deliveryCount; }
+ void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
@@ -180,6 +182,8 @@ class SemanticState : private boost::noncopyable {
const bool authMsg;
const std::string userID;
bool closeComplete;
+ //needed for queue delete events in auto-delete:
+ const std::string connectionId;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 78f2e43ce0..ae994a6bd5 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -41,6 +41,8 @@
namespace qpid {
namespace broker {
+using std::string;
+
using namespace qpid;
using namespace qpid::framing;
using namespace qpid::framing::dtx;
@@ -107,6 +109,12 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
false,
ManagementAgent::toMap(args),
"existing"));
+ QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl()
+ << " type:" << type
+ << " alternateExchange:" << alternateExchange
+ << " durable:" << (durable ? "T" : "F"));
}
}catch(UnknownExchangeTypeException& /*e*/){
throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
@@ -204,7 +212,10 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
}
}
-SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker())
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
+ : HandlerHelper(session), broker(getBroker()),
+ //record connection id and userid for deleting exclsuive queues after session has ended:
+ connectionId(getConnection().getUrl()), userId(getConnection().getUserId())
{}
@@ -223,7 +234,7 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q);
+ Queue::tryAutoDelete(broker, q, connectionId, userId);
}
exclusiveQueues.erase(exclusiveQueues.begin());
}
@@ -307,6 +318,14 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
"existing"));
+ QPID_LOG_CAT(debug, model, "Create queue. name:" << name
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl()
+ << " durable:" << (durable ? "T" : "F")
+ << " exclusive:" << (exclusive ? "T" : "F")
+ << " autodelete:" << (autoDelete ? "T" : "F")
+ << " alternateExchange:" << alternateExchange
+ );
}
}
@@ -411,6 +430,12 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
if (agent)
agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
+ QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
+ << " destination:" << destination
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl()
+ << " exclusive:" << (exclusive ? "T" : "F")
+ );
}
void
@@ -423,6 +448,9 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
+ QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl() );
}
void
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index bc056538b1..3cc745f96c 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -121,6 +121,9 @@ class Queue;
{
Broker& broker;
std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+ //connectionId and userId are needed for queue-delete events for auto deleted, exclusive queues
+ std::string connectionId;
+ std::string userId;
public:
QueueHandlerImpl(SemanticState& session);
diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h
index 253ce8dcf2..ee98da1878 100644
--- a/cpp/src/qpid/broker/SessionContext.h
+++ b/cpp/src/qpid/broker/SessionContext.h
@@ -47,6 +47,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
virtual void addPendingExecutionSync() = 0;
+ virtual void setUnackedCount(uint64_t) {}
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index b58c7c01c5..23fa2ee0ca 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -35,23 +35,39 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
proxy(out),
- clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
+ clusterOrderProxy(c.getClusterOrderOutput() ?
+ new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
-void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+void SessionHandler::connectionException(
+ framing::connection::CloseCode code, const std::string& msg)
+{
// NOTE: must tell the error listener _before_ calling connection.close()
- if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
+ if (connection.getErrorListener())
+ connection.getErrorListener()->connectionError(msg);
+ if (errorListener)
+ errorListener->connectionException(code, msg);
connection.close(code, msg);
}
-void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
- if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::channelException(
+ framing::session::DetachCode code, const std::string& msg)
+{
+ if (connection.getErrorListener())
+ connection.getErrorListener()->sessionError(getChannel(), msg);
+ if (errorListener)
+ errorListener->channelException(code, msg);
}
-void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
- if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::executionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (connection.getErrorListener())
+ connection.getErrorListener()->sessionError(getChannel(), msg);
+ if (errorListener)
+ errorListener->executionException(code, msg);
}
ConnectionState& SessionHandler::getConnection() { return connection; }
@@ -64,7 +80,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
- if (detachedCallback) detachedCallback();
+ if (errorListener) errorListener->detach();
connection.closeChannel(channel.get());
}
@@ -118,8 +134,4 @@ void SessionHandler::attached(const std::string& name)
}
}
-void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
- detachedCallback = cb;
-}
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 4e2cfaa963..ab87cf41a4 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -25,7 +25,7 @@
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
class SessionState;
@@ -43,6 +43,21 @@ class SessionState;
*/
class SessionHandler : public amqp_0_10::SessionHandler {
public:
+ class ErrorListener {
+ public:
+ virtual ~ErrorListener() {}
+ virtual void connectionException(
+ framing::connection::CloseCode code, const std::string& msg) = 0;
+ virtual void channelException(
+ framing::session::DetachCode, const std::string& msg) = 0;
+ virtual void executionException(
+ framing::execution::ErrorCode, const std::string& msg) = 0;
+ /** Called when it is safe to delete the ErrorListener. */
+ virtual void detach() = 0;
+ };
+
+ /**
+ *@param e must not be deleted until ErrorListener::detach has been called */
SessionHandler(Connection&, framing::ChannelId);
~SessionHandler();
@@ -71,7 +86,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
- void setDetachedCallback(boost::function<void()> cb);
+ void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
protected:
virtual void setState(const std::string& sessionName, bool force);
@@ -94,7 +109,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
std::auto_ptr<SetChannelProxy> clusterOrderProxy;
- boost::function<void ()> detachedCallback;
+ boost::shared_ptr<ErrorListener> errorListener;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 99407bc3a6..cc02d9ec94 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -156,7 +156,7 @@ ManagementObject* SessionState::GetManagementObject (void) const
Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
Args& /*args*/,
- string& /*text*/)
+ std::string& /*text*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 8db232a2d6..a8ff7feff9 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -126,6 +126,11 @@ class SessionState : public qpid::SessionState,
// the SessionState of a received Execution.Sync command.
void addPendingExecutionSync();
+ void setUnackedCount(uint64_t count) {
+ if (mgmtObject)
+ mgmtObject->set_unackedMessages(count);
+ }
+
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp
index 8cd2edda76..fa8df6406b 100644
--- a/cpp/src/qpid/broker/System.cpp
+++ b/cpp/src/qpid/broker/System.cpp
@@ -37,7 +37,6 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0)
if (agent != 0)
{
- framing::Uuid systemId;
if (_dataDir.empty ())
{
@@ -66,14 +65,13 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0)
}
mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
- std::string sysname, nodename, release, version, machine;
- qpid::sys::SystemInfo::getSystemId (sysname,
- nodename,
+ qpid::sys::SystemInfo::getSystemId (osName,
+ nodeName,
release,
version,
machine);
- mgmtObject->set_osName (sysname);
- mgmtObject->set_nodeName (nodename);
+ mgmtObject->set_osName (osName);
+ mgmtObject->set_nodeName (nodeName);
mgmtObject->set_release (release);
mgmtObject->set_version (version);
mgmtObject->set_machine (machine);
diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h
index 0fc2c2bd88..6847c662ae 100644
--- a/cpp/src/qpid/broker/System.h
+++ b/cpp/src/qpid/broker/System.h
@@ -21,6 +21,7 @@
//
#include "qpid/management/Manageable.h"
+#include "qpid/framing/Uuid.h"
#include "qmf/org/apache/qpid/broker/System.h"
#include <boost/shared_ptr.hpp>
#include <string>
@@ -35,6 +36,8 @@ class System : public management::Manageable
private:
qmf::org::apache::qpid::broker::System* mgmtObject;
+ framing::Uuid systemId;
+ std::string osName, nodeName, release, version, machine;
public:
@@ -44,6 +47,20 @@ class System : public management::Manageable
management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
+
+
+ /** Persistent UUID assigned by the management system to this broker. */
+ framing::Uuid getSystemId() const { return systemId; }
+ /** Returns the OS name; e.g., GNU/Linux or Windows */
+ std::string getOsName() const { return osName; }
+ /** Returns the node name. Usually the same as the host name. */
+ std::string getNodeName() const { return nodeName; }
+ /** Returns the OS release identifier. */
+ std::string getRelease() const { return release; }
+ /** Returns the OS release version (kernel, build, sp, etc.) */
+ std::string getVersion() const { return version; }
+ /** Returns the hardware type. */
+ std::string getMachine() const { return machine; }
};
}}
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index dd3ec13019..c11389bb17 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -32,20 +32,8 @@ using namespace qpid::sys;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
-
-// TODO aconway 2006-09-20: More efficient matching algorithm.
-// Areas for improvement:
-// - excessive string copying: should be 0 copy, match from original buffer.
-// - match/lookup: use descision tree or other more efficient structure.
-
-namespace
-{
-const std::string STAR("*");
-const std::string HASH("#");
-}
-
// iterator for federation ReOrigin bind operation
-class TopicExchange::ReOriginIter : public TopicExchange::BindingNode::TreeIterator {
+class TopicExchange::ReOriginIter : public BindingNode::TreeIterator {
public:
ReOriginIter() {};
~ReOriginIter() {};
@@ -61,7 +49,7 @@ public:
// match iterator used by route(): builds BindingList of all unique queues
// that match the routing key.
-class TopicExchange::BindingsFinderIter : public TopicExchange::BindingNode::TreeIterator {
+class TopicExchange::BindingsFinderIter : public BindingNode::TreeIterator {
public:
BindingsFinderIter(BindingList &bl) : b(bl) {};
~BindingsFinderIter() {};
@@ -85,7 +73,7 @@ public:
// Iterator to visit all bindings until a given queue is found
-class TopicExchange::QueueFinderIter : public TopicExchange::BindingNode::TreeIterator {
+class TopicExchange::QueueFinderIter : public BindingNode::TreeIterator {
public:
QueueFinderIter(Queue::shared_ptr queue) : queue(queue), found(false) {};
~QueueFinderIter() {};
@@ -107,58 +95,7 @@ public:
};
-// Iterate over a string of '.'-separated tokens.
-struct TopicExchange::TokenIterator {
- typedef pair<const char*,const char*> Token;
-
- TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {}
-
- TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {}
-
- bool finished() const { return !token.first; }
-
- void next() {
- if (token.second == end)
- token.first = token.second = 0;
- else {
- token.first=token.second+1;
- token.second=(find(token.first, end, '.'));
- }
- }
-
- void pop(string &top) {
- ptrdiff_t l = len();
- if (l) {
- top.assign(token.first, l);
- } else top.clear();
- next();
- }
-
- bool match1(char c) const {
- return token.second==token.first+1 && *token.first == c;
- }
-
- bool match(const Token& token2) const {
- ptrdiff_t l=len();
- return l == token2.second-token2.first &&
- strncmp(token.first, token2.first, l) == 0;
- }
-
- bool match(const string& str) const {
- ptrdiff_t l=len();
- return l == ptrdiff_t(str.size()) &&
- str.compare(0, l, token.first, l) == 0;
- }
-
- ptrdiff_t len() const { return token.second - token.first; }
-
-
- const char* end;
- Token token;
-};
-
-
-class TopicExchange::Normalizer : public TopicExchange::TokenIterator {
+class TopicExchange::Normalizer : public TokenIterator {
public:
Normalizer(string& p)
: TokenIterator(&p[0], &p[0]+p.size()), pattern(p)
@@ -230,7 +167,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
RWlock::ScopedWlock l(lock);
- BindingKey *bk = bindingTree.addBindingKey(routingPattern);
+ BindingKey *bk = bindingTree.add(routingPattern);
if (bk) {
Binding::vector& qv(bk->bindingVector);
Binding::vector::iterator q;
@@ -324,7 +261,7 @@ bool TopicExchange::deleteBinding(Queue::shared_ptr queue,
nBindings--;
if(qv.empty()) {
- bindingTree.removeBindingKey(routingKey);
+ bindingTree.remove(routingKey);
}
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
@@ -340,7 +277,7 @@ bool TopicExchange::deleteBinding(Queue::shared_ptr queue,
TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern)
{
// Note well: lock held by caller....
- BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern
+ BindingKey *bk = bindingTree.get(pattern); // Exact match against binding pattern
if (!bk) return 0;
Binding::vector& qv(bk->bindingVector);
Binding::vector::iterator q;
@@ -385,7 +322,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
} else if (!routingKey && !queue) {
return nBindings > 0;
} else if (routingKey) {
- if (bindingTree.getBindingKey(*routingKey)) {
+ if (bindingTree.get(*routingKey)) {
return true;
}
} else {
@@ -400,294 +337,4 @@ TopicExchange::~TopicExchange() {}
const std::string TopicExchange::typeName("topic");
-//
-// class BindingNode
-//
-
-TopicExchange::BindingNode::~BindingNode()
-{
- childTokens.clear();
-}
-
-
-// Add a binding pattern to the tree. Return a pointer to the binding key
-// of the node that matches the binding pattern.
-TopicExchange::BindingKey*
-TopicExchange::BindingNode::addBindingKey(const std::string& normalizedRoute)
-{
- TokenIterator bKey(normalizedRoute);
- return addBindingKey(bKey, normalizedRoute);
-}
-
-
-// Return a pointer to the binding key of the leaf node that matches the binding pattern.
-TopicExchange::BindingKey*
-TopicExchange::BindingNode::getBindingKey(const std::string& normalizedRoute)
-{
- TokenIterator bKey(normalizedRoute);
- return getBindingKey(bKey);
-}
-
-
-// Delete the binding associated with the given route.
-void TopicExchange::BindingNode::removeBindingKey(const std::string& normalizedRoute)
-{
- TokenIterator bKey2(normalizedRoute);
- removeBindingKey(bKey2, normalizedRoute);
-}
-
-// visit each node in the tree. Note: all nodes are visited,
-// even non-leaf nodes (i.e. nodes without any bindings)
-bool TopicExchange::BindingNode::iterateAll(TopicExchange::BindingNode::TreeIterator& iter)
-{
- if (!iter.visit(*this)) return false;
-
- if (starChild && !starChild->iterateAll(iter)) return false;
-
- if (hashChild && !hashChild->iterateAll(iter)) return false;
-
- for (ChildMap::iterator ptr = childTokens.begin();
- ptr != childTokens.end(); ptr++) {
-
- if (!ptr->second->iterateAll(iter)) return false;
- }
-
- return true;
-}
-
-// applies iter against only matching nodes until iter returns false
-// Note Well: the iter may match against the same node more than once
-// if # wildcards are present!
-bool TopicExchange::BindingNode::iterateMatch(const std::string& routingKey, TreeIterator& iter)
-{
- TopicExchange::TokenIterator rKey(routingKey);
- return iterateMatch( rKey, iter );
-}
-
-
-// recurse over binding using token iterator.
-// Note well: bkey is modified!
-TopicExchange::BindingKey*
-TopicExchange::BindingNode::addBindingKey(TokenIterator &bKey,
- const string& fullPattern)
-{
- if (bKey.finished()) {
- // this node's binding
- if (routePattern.empty()) {
- routePattern = fullPattern;
- } else assert(routePattern == fullPattern);
-
- return &bindings;
-
- } else {
- // pop the topmost token & recurse...
-
- if (bKey.match(STAR)) {
- if (!starChild) {
- starChild.reset(new StarNode());
- }
- bKey.next();
- return starChild->addBindingKey(bKey, fullPattern);
-
- } else if (bKey.match(HASH)) {
- if (!hashChild) {
- hashChild.reset(new HashNode());
- }
- bKey.next();
- return hashChild->addBindingKey(bKey, fullPattern);
-
- } else {
- ChildMap::iterator ptr;
- std::string next_token;
- bKey.pop(next_token);
- ptr = childTokens.find(next_token);
- if (ptr != childTokens.end()) {
- return ptr->second->addBindingKey(bKey, fullPattern);
- } else {
- BindingNode::shared_ptr child(new BindingNode(next_token));
- childTokens[next_token] = child;
- return child->addBindingKey(bKey, fullPattern);
- }
- }
- }
-}
-
-
-// Remove a binding pattern from the tree. Return true if the current
-// node becomes a leaf without any bindings (therefore can be deleted).
-// Note Well: modifies parameter bKey's value!
-bool
-TopicExchange::BindingNode::removeBindingKey(TokenIterator &bKey,
- const string& fullPattern)
-{
- bool remove;
-
- if (!bKey.finished()) {
-
- if (bKey.match(STAR)) {
- bKey.next();
- if (starChild) {
- remove = starChild->removeBindingKey(bKey, fullPattern);
- if (remove) {
- starChild.reset();
- }
- }
- } else if (bKey.match(HASH)) {
- bKey.next();
- if (hashChild) {
- remove = hashChild->removeBindingKey(bKey, fullPattern);
- if (remove) {
- hashChild.reset();
- }
- }
- } else {
- ChildMap::iterator ptr;
- std::string next_token;
- bKey.pop(next_token);
- ptr = childTokens.find(next_token);
- if (ptr != childTokens.end()) {
- remove = ptr->second->removeBindingKey(bKey, fullPattern);
- if (remove) {
- childTokens.erase(ptr);
- }
- }
- }
- }
-
- // no bindings and no children == parent can delete this node.
- return getChildCount() == 0 && bindings.bindingVector.empty();
-}
-
-
-// find the binding key that matches the given binding pattern.
-// Note Well: modifies key parameter!
-TopicExchange::BindingKey*
-TopicExchange::BindingNode::getBindingKey(TokenIterator &key)
-{
- if (key.finished()) {
- return &bindings;
- }
-
- string next_token;
-
- key.pop(next_token);
-
- if (next_token == STAR) {
- if (starChild)
- return starChild->getBindingKey(key);
- } else if (next_token == HASH) {
- if (hashChild)
- return hashChild->getBindingKey(key);
- } else {
- ChildMap::iterator ptr;
- ptr = childTokens.find(next_token);
- if (ptr != childTokens.end()) {
- return ptr->second->getBindingKey(key);
- }
- }
-
- return 0;
-}
-
-
-
-// iterate over all nodes that match the given key. Note well: the set of nodes
-// that are visited includes matching non-leaf nodes.
-// Note well: parameter key is modified!
-bool TopicExchange::BindingNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
-{
- // invariant: key has matched all previous tokens up to this node.
- if (key.finished()) {
- // exact match this node: visit if bound
- if (!bindings.bindingVector.empty())
- if (!iter.visit(*this)) return false;
- }
-
- // check remaining key against children, even if empty.
- return iterateMatchChildren(key, iter);
-}
-
-
-TopicExchange::StarNode::StarNode()
- : BindingNode(STAR) {}
-
-
-// See iterateMatch() above.
-// Special case: this node must verify a token is available (match exactly one).
-bool TopicExchange::StarNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
-{
- // must match one token:
- if (key.finished())
- return true; // match failed, but continue iteration on siblings
-
- // pop the topmost token
- key.next();
-
- if (key.finished()) {
- // exact match this node: visit if bound
- if (!bindings.bindingVector.empty())
- if (!iter.visit(*this)) return false;
- }
-
- return iterateMatchChildren(key, iter);
-}
-
-
-TopicExchange::HashNode::HashNode()
- : BindingNode(HASH) {}
-
-
-// See iterateMatch() above.
-// Special case: can match zero or more tokens at the head of the key.
-bool TopicExchange::HashNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
-{
- // consume each token and look for a match on the
- // remaining key.
- while (!key.finished()) {
- if (!iterateMatchChildren(key, iter)) return false;
- key.next();
- }
-
- if (!bindings.bindingVector.empty())
- return iter.visit(*this);
-
- return true;
-}
-
-
-// helper: iterate over current node's matching children
-bool
-TopicExchange::BindingNode::iterateMatchChildren(const TopicExchange::TokenIterator& key,
- TopicExchange::BindingNode::TreeIterator& iter)
-{
- // always try glob - it can match empty keys
- if (hashChild) {
- TokenIterator tmp(key);
- if (!hashChild->iterateMatch(tmp, iter))
- return false;
- }
-
- if (!key.finished()) {
-
- if (starChild) {
- TokenIterator tmp(key);
- if (!starChild->iterateMatch(tmp, iter))
- return false;
- }
-
- if (!childTokens.empty()) {
- TokenIterator newKey(key);
- std::string next_token;
- newKey.pop(next_token);
-
- ChildMap::iterator ptr = childTokens.find(next_token);
- if (ptr != childTokens.end()) {
- return ptr->second->iterateMatch(newKey, iter);
- }
- }
- }
-
- return true;
-}
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index cc24e1411e..46871a1c6b 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -28,6 +28,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicKeyNode.h"
namespace qpid {
@@ -35,7 +36,6 @@ namespace broker {
class TopicExchange : public virtual Exchange {
- struct TokenIterator;
class Normalizer;
struct BindingKey { // binding for this node
@@ -43,129 +43,44 @@ class TopicExchange : public virtual Exchange {
FedBinding fedBinding;
};
- // Binding database:
- // The dotted form of a binding key is broken up and stored in a directed tree graph.
- // Common binding prefix are merged. This allows the route match alogrithm to quickly
- // isolate those sub-trees that match a given routingKey.
- // For example, given the routes:
- // a.b.c.<...>
- // a.b.d.<...>
- // a.x.y.<...>
- // The resulting tree would be:
- // a-->b-->c-->...
- // | +-->d-->...
- // +-->x-->y-->...
- //
- class QPID_BROKER_CLASS_EXTERN BindingNode {
- public:
-
- typedef boost::shared_ptr<BindingNode> shared_ptr;
-
- // for database transversal (visit a node).
- class TreeIterator {
- public:
- TreeIterator() {};
- virtual ~TreeIterator() {};
- virtual bool visit(BindingNode& node) = 0;
- };
-
- BindingNode() {};
- BindingNode(const std::string& token) : token(token) {};
- QPID_BROKER_EXTERN virtual ~BindingNode();
-
- // add normalizedRoute to tree, return associated BindingKey
- QPID_BROKER_EXTERN BindingKey* addBindingKey(const std::string& normalizedRoute);
-
- // return BindingKey associated with normalizedRoute
- QPID_BROKER_EXTERN BindingKey* getBindingKey(const std::string& normalizedRoute);
-
- // remove BindingKey associated with normalizedRoute
- QPID_BROKER_EXTERN void removeBindingKey(const std::string& normalizedRoute);
-
- // applies iter against each node in tree until iter returns false
- QPID_BROKER_EXTERN bool iterateAll(TreeIterator& iter);
-
- // applies iter against only matching nodes until iter returns false
- QPID_BROKER_EXTERN bool iterateMatch(const std::string& routingKey, TreeIterator& iter);
-
- std::string routePattern; // normalized binding that matches this node
- BindingKey bindings; // for matches against this node
-
- protected:
-
- std::string token; // portion of pattern represented by this node
-
- // children
- typedef std::map<const std::string, BindingNode::shared_ptr> ChildMap;
- ChildMap childTokens;
- BindingNode::shared_ptr starChild; // "*" subtree
- BindingNode::shared_ptr hashChild; // "#" subtree
-
- unsigned int getChildCount() { return childTokens.size() +
- (starChild ? 1 : 0) + (hashChild ? 1 : 0); }
- BindingKey* addBindingKey(TokenIterator& bKey,
- const std::string& fullPattern);
- bool removeBindingKey(TokenIterator& bKey,
- const std::string& fullPattern);
- BindingKey* getBindingKey(TokenIterator& bKey);
- QPID_BROKER_EXTERN virtual bool iterateMatch(TokenIterator& rKey, TreeIterator& iter);
- bool iterateMatchChildren(const TokenIterator& key, TreeIterator& iter);
- };
-
- // Special case: ("*" token) Node in the tree for a match exactly one wildcard
- class StarNode : public BindingNode {
- public:
- StarNode();
- ~StarNode() {};
-
- protected:
- virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter);
- };
+ typedef TopicKeyNode<BindingKey> BindingNode;
- // Special case: ("#" token) Node in the tree for a match zero or more
- class HashNode : public BindingNode {
- public:
- HashNode();
- ~HashNode() {};
+ BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern);
+ bool deleteBinding(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ BindingKey *bk);
- protected:
- virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter);
- };
+ class ReOriginIter;
+ class BindingsFinderIter;
+ class QueueFinderIter;
BindingNode bindingTree;
unsigned long nBindings;
qpid::sys::RWlock lock; // protects bindingTree and nBindings
qpid::sys::RWlock cacheLock; // protects cache
std::map<std::string, BindingList> bindingCache; // cache of matched routes.
+
class ClearCache {
private:
qpid::sys::RWlock* cacheLock;
std::map<std::string, BindingList>* bindingCache;
- bool cleared;
+ bool cleared;
public:
- ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),
- bindingCache(bc),cleared(false) {};
+ ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc) :
+ cacheLock(l), bindingCache(bc),cleared(false) {};
void clearCache() {
- qpid::sys::RWlock::ScopedWlock l(*cacheLock);
- if (!cleared) {
- bindingCache->clear();
- cleared =true;
- }
+ qpid::sys::RWlock::ScopedWlock l(*cacheLock);
+ if (!cleared) {
+ bindingCache->clear();
+ cleared =true;
+ }
};
~ClearCache(){
- clearCache();
+ clearCache();
};
};
- BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern);
- bool deleteBinding(Queue::shared_ptr queue,
- const std::string& routingKey,
- BindingKey *bk);
- class ReOriginIter;
- class BindingsFinderIter;
- class QueueFinderIter;
-
- public:
+public:
static const std::string typeName;
static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern);
@@ -199,7 +114,6 @@ class TopicExchange : public virtual Exchange {
};
-
}
}
diff --git a/cpp/src/qpid/broker/TopicKeyNode.h b/cpp/src/qpid/broker/TopicKeyNode.h
new file mode 100644
index 0000000000..7671ed069d
--- /dev/null
+++ b/cpp/src/qpid/broker/TopicKeyNode.h
@@ -0,0 +1,371 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QPID_BROKER_TOPIC_KEY_NODE_
+#define _QPID_BROKER_TOPIC_KEY_NODE_
+
+#include "qpid/broker/BrokerImportExport.h"
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <string>
+#include <string.h>
+
+
+namespace qpid {
+namespace broker {
+
+static const std::string STAR("*");
+static const std::string HASH("#");
+
+
+// Iterate over a string of '.'-separated tokens.
+struct TokenIterator {
+ typedef std::pair<const char*,const char*> Token;
+
+ TokenIterator(const char* b, const char* e) : end(e), token(std::make_pair(b, std::find(b,e,'.'))) {}
+
+ TokenIterator(const std::string& key) : end(&key[0]+key.size()), token(std::make_pair(&key[0], std::find(&key[0],end,'.'))) {}
+
+ bool finished() const { return !token.first; }
+
+ void next() {
+ if (token.second == end)
+ token.first = token.second = 0;
+ else {
+ token.first=token.second+1;
+ token.second=(std::find(token.first, end, '.'));
+ }
+ }
+
+ void pop(std::string &top) {
+ ptrdiff_t l = len();
+ if (l) {
+ top.assign(token.first, l);
+ } else top.clear();
+ next();
+ }
+
+ bool match1(char c) const {
+ return token.second==token.first+1 && *token.first == c;
+ }
+
+ bool match(const Token& token2) const {
+ ptrdiff_t l=len();
+ return l == token2.second-token2.first &&
+ strncmp(token.first, token2.first, l) == 0;
+ }
+
+ bool match(const std::string& str) const {
+ ptrdiff_t l=len();
+ return l == ptrdiff_t(str.size()) &&
+ str.compare(0, l, token.first, l) == 0;
+ }
+
+ ptrdiff_t len() const { return token.second - token.first; }
+
+
+ const char* end;
+ Token token;
+};
+
+
+// Binding database:
+// The dotted form of a binding key is broken up and stored in a directed tree graph.
+// Common binding prefix are merged. This allows the route match alogrithm to quickly
+// isolate those sub-trees that match a given routingKey.
+// For example, given the routes:
+// a.b.c.<...>
+// a.b.d.<...>
+// a.x.y.<...>
+// The resulting tree would be:
+// a-->b-->c-->...
+// | +-->d-->...
+// +-->x-->y-->...
+//
+template <class T>
+class QPID_BROKER_CLASS_EXTERN TopicKeyNode {
+
+ public:
+
+ typedef boost::shared_ptr<TopicKeyNode> shared_ptr;
+
+ // for database transversal (visit a node).
+ class TreeIterator {
+ public:
+ TreeIterator() {};
+ virtual ~TreeIterator() {};
+ virtual bool visit(TopicKeyNode& node) = 0;
+ };
+
+ TopicKeyNode() : isStar(false), isHash(false) {}
+ TopicKeyNode(const std::string& _t) : token(_t), isStar(_t == STAR), isHash(_t == HASH) {}
+ QPID_BROKER_EXTERN virtual ~TopicKeyNode() {
+ childTokens.clear();
+ }
+
+ // add normalizedRoute to tree, return associated T
+ QPID_BROKER_EXTERN T* add(const std::string& normalizedRoute) {
+ TokenIterator bKey(normalizedRoute);
+ return add(bKey, normalizedRoute);
+ }
+
+ // return T associated with normalizedRoute
+ QPID_BROKER_EXTERN T* get(const std::string& normalizedRoute) {
+ TokenIterator bKey(normalizedRoute);
+ return get(bKey);
+ }
+
+ // remove T associated with normalizedRoute
+ QPID_BROKER_EXTERN void remove(const std::string& normalizedRoute) {
+ TokenIterator bKey2(normalizedRoute);
+ remove(bKey2, normalizedRoute);
+ }
+
+ // applies iter against each node in tree until iter returns false
+ QPID_BROKER_EXTERN bool iterateAll(TreeIterator& iter) {
+ if (!iter.visit(*this)) return false;
+ if (starChild && !starChild->iterateAll(iter)) return false;
+ if (hashChild && !hashChild->iterateAll(iter)) return false;
+ for (typename ChildMap::iterator ptr = childTokens.begin();
+ ptr != childTokens.end(); ptr++) {
+ if (!ptr->second->iterateAll(iter)) return false;
+ }
+ return true;
+ }
+
+ // applies iter against only matching nodes until iter returns false
+ QPID_BROKER_EXTERN bool iterateMatch(const std::string& routingKey, TreeIterator& iter) {
+ TokenIterator rKey(routingKey);
+ return iterateMatch( rKey, iter );
+ }
+
+ std::string routePattern; // normalized binding that matches this node
+ T bindings; // for matches against this node
+
+ private:
+
+ std::string token; // portion of pattern represented by this node
+ bool isStar;
+ bool isHash;
+
+ // children
+ typedef std::map<const std::string, typename TopicKeyNode::shared_ptr> ChildMap;
+ ChildMap childTokens;
+ typename TopicKeyNode::shared_ptr starChild; // "*" subtree
+ typename TopicKeyNode::shared_ptr hashChild; // "#" subtree
+
+ unsigned int getChildCount() { return childTokens.size() +
+ (starChild ? 1 : 0) + (hashChild ? 1 : 0); }
+
+ T* add(TokenIterator& bKey, const std::string& fullPattern){
+ if (bKey.finished()) {
+ // this node's binding
+ if (routePattern.empty()) {
+ routePattern = fullPattern;
+ } else assert(routePattern == fullPattern);
+
+ return &bindings;
+
+ } else {
+ // pop the topmost token & recurse...
+
+ if (bKey.match(STAR)) {
+ if (!starChild) {
+ starChild.reset(new TopicKeyNode<T>(STAR));
+ }
+ bKey.next();
+ return starChild->add(bKey, fullPattern);
+
+ } else if (bKey.match(HASH)) {
+ if (!hashChild) {
+ hashChild.reset(new TopicKeyNode<T>(HASH));
+ }
+ bKey.next();
+ return hashChild->add(bKey, fullPattern);
+
+ } else {
+ typename ChildMap::iterator ptr;
+ std::string next_token;
+ bKey.pop(next_token);
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->add(bKey, fullPattern);
+ } else {
+ typename TopicKeyNode::shared_ptr child(new TopicKeyNode<T>(next_token));
+ childTokens[next_token] = child;
+ return child->add(bKey, fullPattern);
+ }
+ }
+ }
+ }
+
+
+ bool remove(TokenIterator& bKey, const std::string& fullPattern) {
+ bool remove;
+ if (!bKey.finished()) {
+ if (bKey.match(STAR)) {
+ bKey.next();
+ if (starChild) {
+ remove = starChild->remove(bKey, fullPattern);
+ if (remove) {
+ starChild.reset();
+ }
+ }
+ } else if (bKey.match(HASH)) {
+ bKey.next();
+ if (hashChild) {
+ remove = hashChild->remove(bKey, fullPattern);
+ if (remove) {
+ hashChild.reset();
+ }
+ }
+ } else {
+ typename ChildMap::iterator ptr;
+ std::string next_token;
+ bKey.pop(next_token);
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ remove = ptr->second->remove(bKey, fullPattern);
+ if (remove) {
+ childTokens.erase(ptr);
+ }
+ }
+ }
+ }
+
+ // no bindings and no children == parent can delete this node.
+ return getChildCount() == 0 && bindings.bindingVector.empty();
+ }
+
+
+ T* get(TokenIterator& bKey) {
+ if (bKey.finished()) {
+ return &bindings;
+ }
+
+ std::string next_token;
+ bKey.pop(next_token);
+
+ if (next_token == STAR) {
+ if (starChild)
+ return starChild->get(bKey);
+ } else if (next_token == HASH) {
+ if (hashChild)
+ return hashChild->get(bKey);
+ } else {
+ typename ChildMap::iterator ptr;
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->get(bKey);
+ }
+ }
+
+ return 0;
+ }
+
+
+ bool iterateMatch(TokenIterator& rKey, TreeIterator& iter) {
+ if (isStar) return iterateMatchStar(rKey, iter);
+ if (isHash) return iterateMatchHash(rKey, iter);
+ return iterateMatchString(rKey, iter);
+ }
+
+
+ bool iterateMatchString(TokenIterator& rKey, TreeIterator& iter){
+ // invariant: key has matched all previous tokens up to this node.
+ if (rKey.finished()) {
+ // exact match this node: visit if bound
+ if (!bindings.bindingVector.empty())
+ if (!iter.visit(*this)) return false;
+ }
+
+ // check remaining key against children, even if empty.
+ return iterateMatchChildren(rKey, iter);
+ }
+
+
+ bool iterateMatchStar(TokenIterator& rKey, TreeIterator& iter) {
+ // must match one token:
+ if (rKey.finished())
+ return true; // match failed, but continue iteration on siblings
+
+ // pop the topmost token
+ rKey.next();
+
+ if (rKey.finished()) {
+ // exact match this node: visit if bound
+ if (!bindings.bindingVector.empty())
+ if (!iter.visit(*this)) return false;
+ }
+
+ return iterateMatchChildren(rKey, iter);
+ }
+
+
+ bool iterateMatchHash(TokenIterator& rKey, TreeIterator& iter) {
+ // consume each token and look for a match on the
+ // remaining key.
+ while (!rKey.finished()) {
+ if (!iterateMatchChildren(rKey, iter)) return false;
+ rKey.next();
+ }
+
+ if (!bindings.bindingVector.empty())
+ return iter.visit(*this);
+
+ return true;
+ }
+
+
+ bool iterateMatchChildren(const TokenIterator& key, TreeIterator& iter) {
+ // always try glob - it can match empty keys
+ if (hashChild) {
+ TokenIterator tmp(key);
+ if (!hashChild->iterateMatch(tmp, iter))
+ return false;
+ }
+
+ if (!key.finished()) {
+ if (starChild) {
+ TokenIterator tmp(key);
+ if (!starChild->iterateMatch(tmp, iter))
+ return false;
+ }
+
+ if (!childTokens.empty()) {
+ TokenIterator newKey(key);
+ std::string next_token;
+ newKey.pop(next_token);
+
+ typename ChildMap::iterator ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->iterateMatch(newKey, iter);
+ }
+ }
+ }
+
+ return true;
+ }
+};
+
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
index a38e6ac12a..40e74be018 100644
--- a/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
@@ -32,6 +32,8 @@
using namespace qpid::framing;
using qpid::sys::SecurityLayer;
+using std::string;
+
namespace qpid {
namespace broker {
@@ -79,7 +81,7 @@ void SaslAuthenticator::fini(void)
return;
}
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool)
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c)
{
if (c.getBroker().getOptions().auth) {
return std::auto_ptr<SaslAuthenticator>(new SspiAuthenticator(c));
diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
index 1dff1ddc8f..fb59d058f8 100644
--- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
+++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
@@ -44,26 +44,34 @@
namespace qpid {
namespace sys {
+
+class Timer;
+
namespace windows {
struct SslServerOptions : qpid::Options
{
std::string certStore;
+ std::string certStoreLocation;
std::string certName;
uint16_t port;
bool clientAuth;
SslServerOptions() : qpid::Options("SSL Options"),
- certStore("My"), port(5671), clientAuth(false)
+ certStore("My"),
+ certStoreLocation("CurrentUser"),
+ certName("localhost"),
+ port(5671),
+ clientAuth(false)
{
qpid::Address me;
if (qpid::sys::SystemInfo::getLocalHostname(me))
certName = me.host;
- else
- certName = "localhost";
addOptions()
("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
+ ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
+ "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
("ssl-require-client-authentication", optValue(clientAuth),
@@ -72,10 +80,12 @@ struct SslServerOptions : qpid::Options
};
class SslProtocolFactory : public qpid::sys::ProtocolFactory {
- const bool tcpNoDelay;
boost::ptr_vector<Socket> listeners;
boost::ptr_vector<AsynchAcceptor> acceptors;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
uint16_t listeningPort;
+ const bool tcpNoDelay;
std::string brokerHost;
const bool clientAuthSelected;
std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
@@ -83,7 +93,9 @@ class SslProtocolFactory : public qpid::sys::ProtocolFactory {
CredHandle credHandle;
public:
- SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, int backlog, bool nodelay);
+ SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime);
~SslProtocolFactory();
void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
@@ -120,8 +132,8 @@ static struct SslPlugin : public Plugin {
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
"", boost::lexical_cast<std::string>(options.port),
- opts.connectionBacklog,
- opts.tcpNoDelay));
+ opts.connectionBacklog, opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime));
QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
@@ -132,9 +144,12 @@ static struct SslPlugin : public Plugin {
} sslPlugin;
SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
- const std::string& host, const std::string& port, int backlog,
- bool nodelay)
- : tcpNoDelay(nodelay),
+ const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime)
+ : brokerTimer(timer),
+ maxNegotiateTime(maxTime),
+ tcpNoDelay(nodelay),
clientAuthSelected(options.clientAuth) {
// Make sure that certificate store is good before listening to sockets
@@ -142,11 +157,25 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
SecInvalidateHandle(&credHandle);
// Get the certificate for this server.
+ DWORD flags = 0;
+ std::string certStoreLocation = options.certStoreLocation;
+ std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+ if (certStoreLocation == "currentuser") {
+ flags = CERT_SYSTEM_STORE_CURRENT_USER;
+ } else if (certStoreLocation == "localmachine") {
+ flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
+ } else if (certStoreLocation == "currentservice") {
+ flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
+ } else {
+ QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
+ << " - Using default location");
+ }
HCERTSTORE certStoreHandle;
certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
X509_ASN_ENCODING,
0,
- CERT_SYSTEM_STORE_LOCAL_MACHINE,
+ flags |
+ CERT_STORE_READONLY_FLAG,
options.certStore.c_str());
if (!certStoreHandle)
throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
@@ -252,7 +281,7 @@ void SslProtocolFactory::established(sys::Poller::shared_ptr poller,
boost::bind(&AsynchIOHandler::idle, async, _1));
}
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}