summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
committerGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
commit0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch)
treed478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp/src/qpid/broker
parent4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff)
downloadqpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections) 2) Improved handling of federation links: a) Links can be created even if the remote broker is not reachable b) If links are lost, re-establishment will occur using an exponential back-off algorithm 3) Durability of exchanges is now viewable through management 4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins. 5) General configuration storage capability has been added to the store/recover interface. This is used for federation links. 6) Management object-ids for durable objects are now themselves durable. (Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp43
-rw-r--r--cpp/src/qpid/broker/Bridge.h19
-rw-r--r--cpp/src/qpid/broker/Broker.cpp42
-rw-r--r--cpp/src/qpid/broker/Broker.h19
-rw-r--r--cpp/src/qpid/broker/Connection.cpp130
-rw-r--r--cpp/src/qpid/broker/Connection.h4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.h3
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp11
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp17
-rw-r--r--cpp/src/qpid/broker/Exchange.h2
-rw-r--r--cpp/src/qpid/broker/Link.cpp281
-rw-r--r--cpp/src/qpid/broker/Link.h115
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp102
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h87
-rw-r--r--cpp/src/qpid/broker/MessageStore.h11
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp10
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp15
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h3
-rw-r--r--cpp/src/qpid/broker/PersistableConfig.h45
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/RecoverableConfig.h45
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp34
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.h6
-rw-r--r--cpp/src/qpid/broker/System.h3
27 files changed, 876 insertions, 182 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 456eba7f9d..a8e7b3c368 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -31,10 +31,12 @@ using qpid::framing::Uuid;
namespace qpid {
namespace broker {
-Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
- args(_args), channel(id, &(c.getOutput())), peer(channel),
- mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
- connection(c), listener(l), name(Uuid(true).str())
+Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l,
+ const management::ArgsLinkBridge& _args) :
+ id(_id), args(_args),
+ mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest,
+ args.i_key, args.i_src_is_queue, args.i_src_is_local)),
+ listener(l), name(Uuid(true).str())
{
management::ManagementAgent::getAgent()->addObject(mgmtObject);
}
@@ -44,18 +46,21 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create()
+void Bridge::create(ConnectionState& c)
{
- framing::AMQP_ServerProxy::Session session(channel);
- session.attach(name, false);
+ channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+ peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+ session->attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
@@ -66,22 +71,22 @@ void Bridge::create()
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
}
+
bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
- peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
- peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
-
}
void Bridge::cancel()
{
- peer.getMessage().cancel(args.i_dest);
- peer.getSession().detach(name);
+ peer->getMessage().cancel(args.i_dest);
+ peer->getSession().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
@@ -94,8 +99,6 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man
if (methodId == management::Bridge::METHOD_CLOSE) {
//notify that we are closed
listener(this);
- //request time on the connections io thread
- connection.getOutput().activateOutput();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index 943050e244..15efcc6482 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -28,33 +28,36 @@
#include "qpid/management/Bridge.h"
#include <boost/function.hpp>
+#include <memory>
namespace qpid {
namespace broker {
class ConnectionState;
+class Link;
class Bridge : public management::Manageable
{
public:
typedef boost::function<void(Bridge*)> CancellationListener;
- Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l,
- const management::ArgsLinkBridge& args);
+ Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args);
~Bridge();
- void create();
+ void create(ConnectionState& c);
void cancel();
management::ManagementObject::shared_ptr GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args);
private:
- management::ArgsLinkBridge args;
- framing::ChannelHandler channel;
- framing::AMQP_ServerProxy peer;
- management::Bridge::shared_ptr mgmtObject;
- ConnectionState& connection;
+ std::auto_ptr<framing::ChannelHandler> channelHandler;
+ std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
+ std::auto_ptr<framing::AMQP_ServerProxy> peer;
+
+ framing::ChannelId id;
+ management::ArgsLinkBridge args;
+ management::Bridge::shared_ptr mgmtObject;
CancellationListener listener;
std::string name;
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e9b1db0413..d80c13f12a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -28,6 +28,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
+#include "Link.h"
#include "qpid/management/PackageQpid.h"
#include "qpid/management/ManagementExchange.h"
#include "qpid/management/ArgsBrokerEcho.h"
@@ -60,7 +61,7 @@ using qpid::sys::Dispatcher;
using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
-using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -129,15 +130,16 @@ Broker::Broker(const Broker::Options& conf) :
config(conf),
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+ links(this),
factory(*this),
sessionManager(conf.ack)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
- ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
- conf.mgmtPubInterval);
- managementAgent = ManagementAgent::getAgent ();
- managementAgent->setInterval (conf.mgmtPubInterval);
+ ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ conf.mgmtPubInterval, this);
+ managementAgent = management::ManagementAgent::getAgent ();
+ ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval);
qpid::management::PackageQpid packageInitializer (managementAgent);
System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
@@ -163,6 +165,7 @@ Broker::Broker(const Broker::Options& conf) :
queues.setParent (vhost);
exchanges.setParent (vhost);
+ links.setParent (vhost);
}
// Early-Initialize plugins
@@ -178,11 +181,12 @@ Broker::Broker(const Broker::Options& conf) :
queues.setStore (store);
dtxManager.setStore (store);
+ links.setStore (store);
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if (store != 0) {
- RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
conf.stagingThreshold);
store->recover(recoverer);
}
@@ -197,8 +201,9 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(qpid_management, ManagementExchange::typeName);
Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
- managementAgent->setExchange (mExchange, dExchange);
- dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+ ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent
+ ((ManagementBroker*) managementAgent.get());
}
else
QPID_LOG(info, "Management not enabled");
@@ -285,7 +290,7 @@ void Broker::shutdown() {
Broker::~Broker() {
shutdown();
- ManagementAgent::shutdown ();
+ ManagementBroker::shutdown ();
delete store;
if (config.auth) {
#if HAVE_SASL
@@ -319,7 +324,15 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_CONNECT : {
management::ArgsBrokerConnect& hp=
dynamic_cast<management::ArgsBrokerConnect&>(args);
- connect(hp.i_host, hp.i_port);
+
+ if (hp.i_useSsl)
+ return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED;
+
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable);
+ if (hp.i_durable && response.second)
+ store->create(*response.first);
+
status = Manageable::STATUS_OK;
break;
}
@@ -355,10 +368,11 @@ void Broker::accept() {
// TODO: How to chose the protocolFactory to use for the connection
void Broker::connect(
- const std::string& host, uint16_t port,
- sys::ConnectionCodec::Factory* f)
+ const std::string& host, uint16_t port, bool /*useSsl*/,
+ sys::ConnectionCodec::Factory* f,
+ sys::ProtocolAccess* access)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory);
+ getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
}
void Broker::connect(
@@ -366,7 +380,7 @@ void Broker::connect(
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- connect(addr.host, addr.port, f);
+ connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index e48f3dc23f..a1eaf4f62f 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -29,11 +29,12 @@
#include "ExchangeRegistry.h"
#include "MessageStore.h"
#include "QueueRegistry.h"
+#include "LinkRegistry.h"
#include "SessionManager.h"
#include "Vhost.h"
#include "System.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/management/Broker.h"
#include "qpid/management/ArgsBrokerConnect.h"
#include "qpid/Options.h"
@@ -43,6 +44,7 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
+#include "qpid/sys/ProtocolAccess.h"
#include <vector>
@@ -111,6 +113,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
+ LinkRegistry& getLinks() { return links; }
uint64_t getStagingThreshold() { return config.stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
@@ -130,11 +133,16 @@ class Broker : public sys::Runnable, public Plugin::Target,
void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, uint16_t port,
- sys::ConnectionCodec::Factory* =0);
+ void connect(const std::string& host, uint16_t port, bool useSsl,
+ sys::ConnectionCodec::Factory* =0,
+ sys::ProtocolAccess* =0);
/** Create a connection to another broker. */
void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
+ // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
+ // For the present just return the first ProtocolFactory registered.
+ boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
+
private:
boost::shared_ptr<sys::Poller> poller;
Options config;
@@ -144,6 +152,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
QueueRegistry queues;
ExchangeRegistry exchanges;
+ LinkRegistry links;
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
@@ -152,10 +161,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
- // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
- // For the present just return the first ProtocolFactory registered.
- boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
-
void declareStandardExchange(const std::string& name, const std::string& type);
};
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 1994c4fdf5..d156b4a914 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -52,37 +52,14 @@ class Connection::MgmtClient : public Connection::MgmtWrapper
management::Client::shared_ptr mgmtClient;
public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming);
~MgmtClient();
void received(framing::AMQFrame& frame);
management::ManagementObject::shared_ptr getManagementObject() const;
void closing();
};
-class Connection::MgmtLink : public Connection::MgmtWrapper
-{
- typedef boost::ptr_vector<Bridge> Bridges;
-
- management::Link::shared_ptr mgmtLink;
- Bridges created;//holds list of bridges pending creation
- Bridges cancelled;//holds list of bridges pending cancellation
- Bridges active;//holds active bridges
- uint channelCounter;
- sys::Mutex linkLock;
-
- void cancel(Bridge*);
-
-public:
- MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtLink();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
- void processPending();
- void process(Connection& connection, const management::Args& args);
-};
-
-
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
adapter(*this, isLink),
@@ -103,14 +80,21 @@ void Connection::initMgmt(bool asLink)
if (agent.get () != 0)
{
if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
} else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
}
}
}
}
+void Connection::requestIOProcessing (boost::function0<void> callback)
+{
+ ioCallback = callback;
+ out->activateOutput();
+}
+
+
Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
@@ -160,8 +144,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
bool Connection::doOutput()
{
try{
- //process any pending mgmt commands:
- if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (ioCallback)
+ ioCallback(); // Lend the IO thread for management processing
+ ioCallback = 0;
if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
//then do other output as needed:
@@ -192,8 +177,7 @@ ManagementObject::shared_ptr Connection::GetManagementObject (void) const
return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
}
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& args)
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -207,93 +191,17 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
out->activateOutput();
status = Manageable::STATUS_OK;
break;
- case management::Link::METHOD_BRIDGE :
- //queue this up and request chance to do output (i.e. get connections thread of control):
- mgmtWrapper->process(*this, args);
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
}
return status;
}
-Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
- : channelCounter(1)
-{
- mgmtLink = management::Link::shared_ptr
- (new management::Link(conn, parent, mgmtId));
- agent->addObject (mgmtLink);
-}
-
-Connection::MgmtLink::~MgmtLink()
-{
- if (mgmtLink.get () != 0)
- mgmtLink->resourceDestroy ();
-}
-
-void Connection::MgmtLink::received(framing::AMQFrame& frame)
-{
- if (mgmtLink.get () != 0)
- {
- mgmtLink->inc_framesFromPeer ();
- mgmtLink->inc_bytesFromPeer (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void Connection::MgmtLink::closing()
-{
- if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void Connection::MgmtLink::processPending()
-{
- Mutex::ScopedLock l(linkLock);
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create();
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
- }
- cancelled.clear();
- }
-}
-
-void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
-{
- Mutex::ScopedLock l(linkLock);
- created.push_back(new Bridge(channelCounter++, connection,
- boost::bind(&MgmtLink::cancel, this, _1),
- dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void Connection::MgmtLink::cancel(Bridge* b)
-{
- Mutex::ScopedLock l(linkLock);
- //need to take this out the active map and add it to the cancelled map
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == b) {
- cancelled.transfer(cancelled.end(), i, active);
- break;
- }
- }
-}
-
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
+ ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming)
{
mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId));
+ (new management::Client (conn, parent, mgmtId, incoming));
agent->addObject (mgmtClient);
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index c8e7fb7079..dff1e0653b 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -54,6 +54,7 @@ class Connection : public sys::ConnectionInputHandler,
public ConnectionState
{
public:
+ typedef boost::shared_ptr<Connection> shared_ptr;
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
@@ -78,6 +79,7 @@ class Connection : public sys::ConnectionInputHandler,
ManagementMethod (uint32_t methodId, management::Args& args);
void initMgmt(bool asLink = false);
+ void requestIOProcessing (boost::function0<void>);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -100,7 +102,6 @@ class Connection : public sys::ConnectionInputHandler,
virtual void process(Connection&, const management::Args&){}
};
class MgmtClient;
- class MgmtLink;
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
@@ -108,6 +109,7 @@ class Connection : public sys::ConnectionInputHandler,
std::auto_ptr<MgmtWrapper> mgmtWrapper;
bool mgmtClosing;
const std::string mgmtId;
+ boost::function0<void> ioCallback;
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index 5de5a0230a..cd015ce4f5 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std:
}
sys::ConnectionCodec*
-ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) {
// used to create connections from one broker to another
- return new amqp_0_10::Connection(out, broker, id, true);
+ return new amqp_0_10::Connection(out, broker, id, true, a);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h
index 5797495054..bf55ab3b88 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/cpp/src/qpid/broker/ConnectionFactory.h
@@ -24,6 +24,7 @@
#include "qpid/sys/ConnectionCodec.h"
namespace qpid {
+namespace sys { class ProtocolAccess; }
namespace broker {
class Broker;
@@ -37,7 +38,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory {
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id);
+ create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0);
private:
Broker& broker;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 4ed2f5bfa2..162664fb88 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -35,8 +35,9 @@ using namespace qpid::framing;
namespace
{
-const std::string PLAIN = "PLAIN";
-const std::string en_US = "en_US";
+const std::string ANONYMOUS = "ANONYMOUS";
+const std::string PLAIN = "PLAIN";
+const std::string en_US = "en_US";
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId)
@@ -135,10 +136,8 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/,
const framing::Array& /*mechanisms*/,
const framing::Array& /*locales*/)
{
- string uid = "qpidd";
- string pwd = "qpidd";
- string response = ((char)0) + uid + ((char)0) + pwd;
- server.startOk(FieldTable(), PLAIN, response, en_US);
+ string response;
+ server.startOk(FieldTable(), ANONYMOUS, response, en_US);
connection.initMgmt(true);
}
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 47d616cf16..0d9ffb7122 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name));
+ (new management::Exchange (this, parent, _name, durable));
agent->addObject (mgmtExchange);
}
}
@@ -56,8 +56,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name));
- agent->addObject (mgmtExchange);
+ (new management::Exchange (this, parent, _name, durable));
+ if (!durable)
+ agent->addObject (mgmtExchange);
}
}
}
@@ -68,6 +69,16 @@ Exchange::~Exchange ()
mgmtExchange->resourceDestroy ();
}
+void Exchange::setPersistenceId(uint64_t id) const
+{
+ if (mgmtExchange != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtExchange, id, 2);
+ }
+ persistenceId = id;
+}
+
Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
{
string name;
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 7902eb4219..9b18129857 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -90,7 +90,7 @@ namespace qpid {
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
//PersistableExchange:
- void setPersistenceId(uint64_t id) const { persistenceId = id; }
+ void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
new file mode 100644
index 0000000000..83c9a2a62e
--- /dev/null
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Link.h"
+#include "LinkRegistry.h"
+#include "Broker.h"
+#include "Connection.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/Link.h"
+#include "boost/bind.hpp"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+
+Link::Link(LinkRegistry* _links,
+ string& _host,
+ uint16_t _port,
+ bool _useSsl,
+ bool _durable,
+ Broker* _broker,
+ management::Manageable* parent)
+ : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ persistenceId(0), broker(_broker), state(0),
+ access(boost::bind(&Link::established, this),
+ boost::bind(&Link::closed, this, _1, _2),
+ boost::bind(&Link::setConnection, this, _1)),
+ visitCount(0),
+ currentInterval(1),
+ closing(false),
+ channelCounter(1)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
+ if (agent.get() != 0)
+ {
+ mgmtObject = management::Link::shared_ptr
+ (new management::Link(this, parent, _host, _port, _useSsl, _durable));
+ if (!durable)
+ agent->addObject(mgmtObject);
+ }
+ }
+ setState(STATE_WAITING);
+}
+
+Link::~Link ()
+{
+ if (state == STATE_OPERATIONAL)
+ access.close();
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+void Link::setState (int newState)
+{
+ if (newState == state)
+ return;
+
+ state = newState;
+ if (mgmtObject.get() == 0)
+ return;
+
+ switch (state)
+ {
+ case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
+ case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
+ case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+ }
+}
+
+void Link::startConnection ()
+{
+ try {
+ broker->connect (host, port, useSsl, 0, &access);
+ setState(STATE_CONNECTING);
+ } catch(std::exception& e) {
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (e.what());
+ }
+}
+
+void Link::established ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
+ setState(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ if (closing)
+ destroy();
+}
+
+void Link::closed (int, std::string text)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_OPERATIONAL)
+ QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
+
+ connection.reset();
+ created.transfer(created.end(), active.begin(), active.end(), active);
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ if (closing)
+ destroy();
+}
+
+void Link::destroy ()
+{
+ QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ connection.reset();
+ links->destroy (host, port);
+}
+
+void Link::cancel(Bridge* bridge)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //need to take this out of the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == bridge) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+
+ if (connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::ioThreadProcessing()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create(*connection);
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void Link::setConnection(Connection::shared_ptr c)
+{
+ connection = c;
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::maintenanceVisit ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_WAITING)
+ {
+ visitCount++;
+ if (visitCount >= currentInterval)
+ {
+ visitCount = 0;
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnection();
+ }
+ }
+}
+
+void Link::setPersistenceId(uint64_t id) const
+{
+ if (mgmtObject != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject, id);
+ }
+ persistenceId = id;
+}
+
+const string& Link::getName() const
+{
+ return host;
+}
+
+Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
+{
+ string host;
+ uint16_t port;
+
+ buffer.getShortString(host);
+ port = buffer.getShort();
+ bool useSsl(buffer.getOctet());
+ bool durable(buffer.getOctet());
+
+ return links.declare(host, port, useSsl, durable).first;
+}
+
+void Link::encode(Buffer& buffer) const
+{
+ buffer.putShortString(string("link"));
+ buffer.putShortString(host);
+ buffer.putShort(port);
+ buffer.putOctet(useSsl ? 1 : 0);
+ buffer.putOctet(durable ? 1 : 0);
+}
+
+uint32_t Link::encodedSize() const
+{
+ return host.size() + 1 // short-string (host)
+ + 5 // short-string ("link")
+ + 2 // port
+ + 1 // useSsl
+ + 1; // durable
+}
+
+ManagementObject::shared_ptr Link::GetManagementObject (void) const
+{
+ return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ switch (op)
+ {
+ case management::Link::METHOD_CLOSE :
+ closing = true;
+ if (state != STATE_CONNECTING)
+ destroy();
+ return Manageable::STATUS_OK;
+
+ case management::Link::METHOD_BRIDGE :
+ management::ArgsLinkBridge iargs =
+ dynamic_cast<const management::ArgsLinkBridge&>(args);
+
+ // Durable bridges are only valid on durable links
+ if (iargs.i_durable && !durable)
+ return Manageable::STATUS_INVALID_PARAMETER;
+
+ created.push_back(new Bridge(this, channelCounter++,
+ boost::bind(&Link::cancel, this, _1), iargs));
+
+ if (state == STATE_OPERATIONAL && connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ return Manageable::STATUS_OK;
+ }
+
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
new file mode 100644
index 0000000000..838c3bf696
--- /dev/null
+++ b/cpp/src/qpid/broker/Link.h
@@ -0,0 +1,115 @@
+#ifndef _broker_Link_h
+#define _broker_Link_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include "MessageStore.h"
+#include "PersistableConfig.h"
+#include "Bridge.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/ProtocolAccess.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Link.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+
+namespace qpid {
+ namespace broker {
+
+ using std::string;
+ class LinkRegistry;
+ class Broker;
+ class Connection;
+
+ class Link : public PersistableConfig, public management::Manageable {
+ private:
+ sys::Mutex lock;
+ LinkRegistry* links;
+ const string host;
+ const uint16_t port;
+ const bool useSsl;
+ const bool durable;
+ mutable uint64_t persistenceId;
+ management::Link::shared_ptr mgmtObject;
+ Broker* broker;
+ int state;
+ sys::ProtocolAccess access;
+ uint32_t visitCount;
+ uint32_t currentInterval;
+ bool closing;
+
+ typedef boost::ptr_vector<Bridge> Bridges;
+ Bridges created; // Bridges pending creation
+ Bridges active; // Bridges active
+ Bridges cancelled; // Bridges pending deletion
+ uint channelCounter;
+ boost::shared_ptr<Connection> connection;
+
+ static const int STATE_WAITING = 1;
+ static const int STATE_CONNECTING = 2;
+ static const int STATE_OPERATIONAL = 3;
+
+ static const uint32_t MAX_INTERVAL = 16;
+
+ void setState (int newState);
+ void startConnection(); // Start the IO Connection
+ void established(); // Called when connection is created
+ void closed(int, std::string); // Called when connection goes away
+ void destroy(); // Called when mgmt deletes this link
+ void cancel(Bridge*); // Called by self-cancelling bridge
+ void ioThreadProcessing(); // Called on connection's IO thread by request
+ void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
+
+ public:
+ typedef boost::shared_ptr<Link> shared_ptr;
+
+ Link(LinkRegistry* links,
+ string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable,
+ Broker* broker,
+ management::Manageable* parent = 0);
+ virtual ~Link();
+
+ bool isDurable() { return durable; }
+ void maintenanceVisit ();
+
+ // PersistableConfig:
+ void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+ const string& getName() const;
+
+ static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t ManagementMethod (uint32_t, management::Args&);
+ };
+ }
+}
+
+
+#endif /*!_broker_Link.cpp_h*/
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
new file mode 100644
index 0000000000..6e20a3f7ce
--- /dev/null
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LinkRegistry.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using std::pair;
+using std::stringstream;
+using boost::intrusive_ptr;
+
+#define LINK_MAINT_INTERVAL 5
+
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
+{
+ timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+}
+
+LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
+ TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC)), links(_links) {}
+
+void LinkRegistry::Periodic::fire ()
+{
+ links.periodicMaintenance ();
+ links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+}
+
+void LinkRegistry::periodicMaintenance ()
+{
+ Mutex::ScopedLock locker(lock);
+ linksToDestroy.clear();
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
+ i->second->maintenanceVisit();
+}
+
+pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string key = string(keystream.str());
+
+ LinkMap::iterator i = links.find(key);
+ if (i == links.end())
+ {
+ Link::shared_ptr link;
+
+ link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent));
+ links[key] = link;
+ return std::pair<Link::shared_ptr, bool>(link, true);
+ }
+ return std::pair<Link::shared_ptr, bool>(i->second, false);
+}
+
+void LinkRegistry::destroy(const string& host, const uint16_t port)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string key = string(keystream.str());
+
+ LinkMap::iterator i = links.find(key);
+ if (i != links.end())
+ {
+ if (i->second->isDurable() && store)
+ store->destroy(*(i->second));
+ linksToDestroy[key] = i->second;
+ links.erase(i);
+ }
+}
+
+void LinkRegistry::setStore (MessageStore* _store)
+{
+ assert (store == 0 && _store != 0);
+ store = _store;
+}
+
+MessageStore* LinkRegistry::getStore() const {
+ return store;
+}
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
new file mode 100644
index 0000000000..86d8c3d2f9
--- /dev/null
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -0,0 +1,87 @@
+#ifndef _broker_LinkRegistry_h
+#define _broker_LinkRegistry_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include "Link.h"
+#include "MessageStore.h"
+#include "Timer.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/management/Manageable.h"
+
+namespace qpid {
+namespace broker {
+
+ class Broker;
+ class LinkRegistry {
+
+ // Declare a timer task to manage the establishment of link connections and the
+ // re-establishment of lost link connections.
+ struct Periodic : public TimerTask
+ {
+ LinkRegistry& links;
+
+ Periodic(LinkRegistry& links);
+ virtual ~Periodic() {};
+ void fire();
+ };
+
+ typedef std::map<std::string, Link::shared_ptr> LinkMap;
+ LinkMap links;
+ LinkMap linksToDestroy;
+ qpid::sys::Mutex lock;
+ Broker* broker;
+ Timer timer;
+ management::Manageable* parent;
+ MessageStore* store;
+
+ void periodicMaintenance ();
+
+ public:
+ LinkRegistry (Broker* _broker);
+ std::pair<Link::shared_ptr, bool> declare(std::string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable);
+ void destroy(const std::string& host, const uint16_t port);
+
+ /**
+ * Register the manageable parent for declared queues
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
+
+ /**
+ * Set the store to use. May only be called once.
+ */
+ void setStore (MessageStore*);
+
+ /**
+ * Return the message store used.
+ */
+ MessageStore* getStore() const;
+ };
+}
+}
+
+
+#endif /*!_broker_LinkRegistry_h*/
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 76469ccc50..17fd6aefb8 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -24,6 +24,7 @@
#include "PersistableExchange.h"
#include "PersistableMessage.h"
#include "PersistableQueue.h"
+#include "PersistableConfig.h"
#include "RecoveryManager.h"
#include "TransactionalStore.h"
#include "qpid/framing/FieldTable.h"
@@ -87,6 +88,16 @@ public:
const std::string& key, const framing::FieldTable& args) = 0;
/**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config) = 0;
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config) = 0;
+
+ /**
* Stores a messages before it has been enqueued
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index e02c87f069..2544d5d533 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -70,6 +70,16 @@ void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQ
TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
}
+void MessageStoreModule::create(const PersistableConfig& config)
+{
+ TRANSFER_EXCEPTION(store->create(config));
+}
+
+void MessageStoreModule::destroy(const PersistableConfig& config)
+{
+ TRANSFER_EXCEPTION(store->destroy(config));
+}
+
void MessageStoreModule::recover(RecoveryManager& registry)
{
TRANSFER_EXCEPTION(store->recover(registry));
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index c7ad76d8bb..f4d05e3e0d 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -57,6 +57,8 @@ public:
const std::string& key, const framing::FieldTable& args);
void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
+ void create(const PersistableConfig& config);
+ void destroy(const PersistableConfig& config);
void recover(RecoveryManager& queues);
void stage(boost::intrusive_ptr<PersistableMessage>& msg);
void destroy(PersistableMessage& msg);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 8936b0440f..401c76f5a2 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -49,7 +49,7 @@ public:
using namespace qpid::broker;
-NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn), nextPersistenceId(1) {}
bool NullMessageStore::init(const Options* /*options*/) {return true;}
@@ -57,6 +57,7 @@ void NullMessageStore::create(PersistableQueue& queue, const framing::FieldTable
{
QPID_LOG(info, "Queue '" << queue.getName()
<< "' will not be durable. Persistence not enabled.");
+ queue.setPersistenceId(nextPersistenceId++);
}
void NullMessageStore::destroy(PersistableQueue&)
@@ -67,6 +68,7 @@ void NullMessageStore::create(const PersistableExchange& exchange, const framing
{
QPID_LOG(info, "Exchange'" << exchange.getName()
<< "' will not be durable. Persistence not enabled.");
+ exchange.setPersistenceId(nextPersistenceId++);
}
void NullMessageStore::destroy(const PersistableExchange& )
@@ -76,6 +78,17 @@ void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&,
void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
+void NullMessageStore::create(const PersistableConfig& config)
+{
+ QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+ config.setPersistenceId(nextPersistenceId++);
+}
+
+void NullMessageStore::destroy(const PersistableConfig&)
+{
+ QPID_LOG(info, "Persistence not enabled, configuration not stored.");
+}
+
void NullMessageStore::recover(RecoveryManager&)
{
QPID_LOG(info, "Persistence not enabled, no recovery attempted.");
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 96d1c483a2..f06e749ebb 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -37,6 +37,7 @@ class NullMessageStore : public MessageStore
{
std::set<std::string> prepared;
const bool warn;
+ uint64_t nextPersistenceId;
public:
NullMessageStore(bool warn = false);
@@ -57,6 +58,8 @@ public:
const std::string& key, const framing::FieldTable& args);
virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
+ virtual void create(const PersistableConfig& config);
+ virtual void destroy(const PersistableConfig& config);
virtual void recover(RecoveryManager& queues);
virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg);
virtual void destroy(PersistableMessage& msg);
diff --git a/cpp/src/qpid/broker/PersistableConfig.h b/cpp/src/qpid/broker/PersistableConfig.h
new file mode 100644
index 0000000000..914e91ea80
--- /dev/null
+++ b/cpp/src/qpid/broker/PersistableConfig.h
@@ -0,0 +1,45 @@
+#ifndef _broker_PersistableConfig_h
+#define _broker_PersistableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface used by general-purpose persistable configuration for
+ * the message store.
+ */
+class PersistableConfig : public Persistable
+{
+public:
+ virtual const std::string& getName() const = 0;
+ virtual ~PersistableConfig() {};
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index f7bad8ebc6..355ebdd81e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -586,7 +586,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
if (mgmtObject != 0 && persistenceId == 0)
{
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject, _persistenceId);
+ agent->addObject (mgmtObject, _persistenceId, 3);
}
persistenceId = _persistenceId;
}
diff --git a/cpp/src/qpid/broker/RecoverableConfig.h b/cpp/src/qpid/broker/RecoverableConfig.h
new file mode 100644
index 0000000000..838a8582dc
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoverableConfig.h
@@ -0,0 +1,45 @@
+#ifndef _broker_RecoverableConfig_h
+#define _broker_RecoverableConfig_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which configurations are recovered.
+ */
+class RecoverableConfig
+{
+public:
+ typedef boost::shared_ptr<RecoverableConfig> shared_ptr;
+
+ virtual void setPersistenceId(uint64_t id) = 0;
+ virtual ~RecoverableConfig() {};
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h
index bf1813a093..7dcbe3a2b0 100644
--- a/cpp/src/qpid/broker/RecoveryManager.h
+++ b/cpp/src/qpid/broker/RecoveryManager.h
@@ -25,6 +25,7 @@
#include "RecoverableQueue.h"
#include "RecoverableMessage.h"
#include "RecoverableTransaction.h"
+#include "RecoverableConfig.h"
#include "TransactionalStore.h"
#include "qpid/framing/Buffer.h"
@@ -39,6 +40,8 @@ class RecoveryManager{
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn) = 0;
+ virtual RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer) = 0;
+
virtual void recoveryComplete() = 0;
};
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index feb629e118..c6ec573822 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -22,6 +22,7 @@
#include "Message.h"
#include "Queue.h"
+#include "Link.h"
#include "RecoveredEnqueue.h"
#include "RecoveredDequeue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -34,9 +35,9 @@ using boost::intrusive_ptr;
static const uint8_t BASIC = 1;
static const uint8_t MESSAGE = 2;
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges,
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
DtxManager& _dtxMgr, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -82,6 +83,15 @@ public:
void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
};
+class RecoverableConfigImpl : public RecoverableConfig
+{
+ // TODO: Add links for other config types, consider using super class (PersistableConfig?)
+ Link::shared_ptr link;
+public:
+ RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+ void setPersistenceId(uint64_t id);
+};
+
class RecoverableTransactionImpl : public RecoverableTransaction
{
DtxBuffer::shared_ptr buffer;
@@ -125,6 +135,19 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const
return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
}
+RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
+{
+ string kind;
+
+ buffer.getShortString (kind);
+ if (kind == "link")
+ {
+ return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
+ }
+
+ return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
+}
+
void RecoveryManagerImpl::recoveryComplete()
{
//TODO (finalise binding setup etc)
@@ -185,6 +208,13 @@ void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
exchange->setPersistenceId(id);
}
+void RecoverableConfigImpl::setPersistenceId(uint64_t id)
+{
+ if (link.get())
+ link->setPersistenceId(id);
+ // TODO: add calls to other types. Consider using a parent class.
+}
+
void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
{
Queue::shared_ptr queue = queues.find(queueName);
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h
index 58ec63926c..cd34d464f5 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -25,6 +25,7 @@
#include "DtxManager.h"
#include "ExchangeRegistry.h"
#include "QueueRegistry.h"
+#include "LinkRegistry.h"
#include "RecoveryManager.h"
namespace qpid {
@@ -33,10 +34,12 @@ namespace broker {
class RecoveryManagerImpl : public RecoveryManager{
QueueRegistry& queues;
ExchangeRegistry& exchanges;
+ LinkRegistry& links;
DtxManager& dtxMgr;
const uint64_t stagingThreshold;
public:
- RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold);
+ RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
+ DtxManager& dtxMgr, uint64_t stagingThreshold);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
@@ -44,6 +47,7 @@ namespace broker {
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn);
+ RecoverableConfig::shared_ptr recoverConfig(framing::Buffer& buffer);
void recoveryComplete();
};
diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h
index 0d63bd1b3d..65086abec0 100644
--- a/cpp/src/qpid/broker/System.h
+++ b/cpp/src/qpid/broker/System.h
@@ -42,9 +42,6 @@ class System : public management::Manageable
management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
-
- management::Manageable::status_t ManagementMethod (uint32_t, management::Args&)
- { return management::Manageable::STATUS_OK; }
};
}}