summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-01-27 20:09:25 +0000
committerAlan Conway <aconway@apache.org>2014-01-27 20:09:25 +0000
commitb710193d135c55357d3333bd55a733a49c361632 (patch)
treee8d357a0687a1b711577ff01a5608b37b14c3504 /qpid/cpp/src
parentecf1ea9d93ffc84ee758efdd89fa2749e3ec7aa7 (diff)
downloadqpid-python-b710193d135c55357d3333bd55a733a49c361632.tar.gz
NO-JIRA: Minor refactor to improve code safety: calling shared_from_this on creation.
Previous anti-pattern: Classes need to call shared_from_this during creation, but can't call it in the ctor so had a separate initiailize function that the user was required to call immediately after the constructor. Possible for user to forget. Improved pattern: Introduce public static create() functions to call constructor and initialize, make constructor and initialize private. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1561828 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp19
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h8
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h10
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h11
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h5
11 files changed, 64 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index d33fcdd6b4..26f7baf104 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -71,8 +71,7 @@ void Backup::setBrokerUrl(const Url& brokers) {
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
link = result.first;
- replicator.reset(new BrokerReplicator(haBroker, link));
- replicator->initialize();
+ replicator = BrokerReplicator::create(haBroker, link);
broker.getExchanges().registerExchange(replicator);
}
link->setUrl(brokers); // Outside the lock, once set link doesn't change.
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index e9734170b8..49e7346525 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -274,6 +274,12 @@ template <class EventType> std::string key() {
}
}
+boost::shared_ptr<BrokerReplicator> BrokerReplicator::create(
+ HaBroker& hb, const boost::shared_ptr<broker::Link>& l) {
+ boost::shared_ptr<BrokerReplicator> br(new BrokerReplicator(hb, l));
+ br->initialize();
+ return br;
+}
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
@@ -763,15 +769,10 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.getLevel(*queue) == ALL) {
- boost::shared_ptr<QueueReplicator> qr;
- if (TxReplicator::isTxQueue(queue->getName())){
- qr.reset(new TxReplicator(haBroker, queue, link));
- }
- else {
- qr.reset(new QueueReplicator(haBroker, queue, link));
- }
- qr->activate();
- return qr;
+ if (TxReplicator::isTxQueue(queue->getName()))
+ return TxReplicator::create(haBroker, queue, link);
+ else
+ return QueueReplicator::create(haBroker, queue, link);
}
return boost::shared_ptr<QueueReplicator>();
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index a6bf02c392..445406ad19 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -75,10 +75,11 @@ class BrokerReplicator : public broker::Exchange,
public:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
- BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
+ static boost::shared_ptr<BrokerReplicator> create(
+ HaBroker&, const boost::shared_ptr<broker::Link>&);
+
~BrokerReplicator();
- void initialize(); // Must be called immediately after constructor.
void shutdown();
// Exchange methods
@@ -98,6 +99,9 @@ class BrokerReplicator : public broker::Exchange,
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
private:
+ BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
+ void initialize(); // Called in create()
+
typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 50e99ef527..90ebd423c8 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -176,9 +176,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- shared_ptr<QueueReplicator> qr(
- new QueueReplicator(*this, queue, link));
- qr->activate();
+ shared_ptr<QueueReplicator> qr(QueueReplicator::create(*this, queue, link));
broker.getExchanges().registerExchange(qr);
break;
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 2b97d1dd9c..081fd7b6c7 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -408,9 +408,8 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
shared_ptr<PrimaryTxObserver> Primary::makeTxObserver(
const boost::intrusive_ptr<broker::TxBuffer>& txBuffer)
{
- shared_ptr<PrimaryTxObserver> observer(
- new PrimaryTxObserver(*this, haBroker, txBuffer));
- observer->initialize();
+ shared_ptr<PrimaryTxObserver> observer =
+ PrimaryTxObserver::create(*this, haBroker, txBuffer);
txMap[observer->getTxQueue()->getName()] = observer;
return observer;
}
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index eeb3312aec..dc5bf15911 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -82,6 +82,14 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
+boost::shared_ptr<PrimaryTxObserver> PrimaryTxObserver::create(
+ Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx) {
+ boost::shared_ptr<PrimaryTxObserver> pto(new PrimaryTxObserver(p, hb, tx));
+ pto->initialize();
+ return pto;
+}
+
+
PrimaryTxObserver::PrimaryTxObserver(
Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
) :
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 31b2b84b0a..105fee4d40 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -66,11 +66,10 @@ class PrimaryTxObserver : public broker::TransactionObserver,
public boost::enable_shared_from_this<PrimaryTxObserver>
{
public:
- PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
- ~PrimaryTxObserver();
+ static boost::shared_ptr<PrimaryTxObserver> create(
+ Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
- /** Call immediately after constructor, uses shared_from_this. */
- void initialize();
+ ~PrimaryTxObserver();
void enqueue(const QueuePtr&, const broker::Message&);
void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId);
@@ -96,6 +95,9 @@ class PrimaryTxObserver : public broker::TransactionObserver,
ENDED ///< Commit or rollback sent, local transaction ended.
};
+ PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
+ void initialize();
+
void checkState(State expect, const std::string& msg);
void end(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 1300819eb7..ffb40fdf22 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -108,6 +108,14 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver {
boost::shared_ptr<QueueReplicator> queueReplicator;
};
+boost::shared_ptr<QueueReplicator> QueueReplicator::create(
+ HaBroker& hb, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l)
+{
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(hb, q, l));
+ qr->initialize();
+ return qr;
+}
+
QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
@@ -144,9 +152,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
QueueReplicator::~QueueReplicator() {}
-// This must be called immediately after the constructor.
-// It has to be separate so we can call shared_from_this().
-void QueueReplicator::activate() {
+void QueueReplicator::initialize() {
Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Created");
if (!queue) return; // Already destroyed
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index a86355f194..22cd13a0a8 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -67,13 +67,11 @@ class QueueReplicator : public broker::Exchange,
/*** Copy QueueReplicators from the registry */
static void copy(broker::ExchangeRegistry&, Vector& result);
- QueueReplicator(HaBroker&,
- boost::shared_ptr<broker::Queue> q,
- boost::shared_ptr<broker::Link> l);
+ static boost::shared_ptr<QueueReplicator> create(
+ HaBroker&, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
- void activate(); // Must be called immediately after constructor.
void disconnect(); // Called when we are disconnected from the primary.
std::string getType() const;
@@ -97,6 +95,11 @@ class QueueReplicator : public broker::Exchange,
typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
+ QueueReplicator(
+ HaBroker&, boost::shared_ptr<broker::Queue>, boost::shared_ptr<broker::Link>);
+
+ void initialize(); // Called as part of create()
+
virtual void deliver(const broker::Message&);
virtual void destroy(); // Called when the queue is destroyed.
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 9ae9dcce36..7ff03b5f92 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -70,6 +70,16 @@ string TxReplicator::getTxId(const string& q) {
string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
+boost::shared_ptr<TxReplicator> TxReplicator::create(
+ HaBroker& hb,
+ const boost::shared_ptr<broker::Queue>& txQueue,
+ const boost::shared_ptr<broker::Link>& link)
+{
+ boost::shared_ptr<TxReplicator> tr(new TxReplicator(hb, txQueue, link));
+ tr->initialize();
+ return tr;
+}
+
TxReplicator::TxReplicator(
HaBroker& hb,
const boost::shared_ptr<broker::Queue>& txQueue,
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index 9d80ecb8d3..7f1256699a 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -58,7 +58,9 @@ class TxReplicator : public QueueReplicator {
static bool isTxQueue(const std::string& queue);
static std::string getTxId(const std::string& queue);
- TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
+ static boost::shared_ptr<TxReplicator> create(
+ HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
+
~TxReplicator();
std::string getType() const;
@@ -78,6 +80,7 @@ class TxReplicator : public QueueReplicator {
typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap;
typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap;
+ TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
void sendMessage(const broker::Message&, sys::Mutex::ScopedLock&);
void enqueue(const std::string& data, sys::Mutex::ScopedLock&);
void dequeue(const std::string& data, sys::Mutex::ScopedLock&);