summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp18
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h8
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp7
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark9
4 files changed, 27 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 34916c2d1e..befaaa31ff 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -59,7 +59,13 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
QPID_LOG(info, *this << "Created, settings: " << q->getSettings());
+}
+// This must be separate from the constructor so we can call shared_from_this.
+void QueueReplicator::activate() {
+ // Take a reference to myself to ensure not deleted before initializeBridge
+ // is called.
+ self = shared_from_this();
queue->getBroker()->getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -72,19 +78,19 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
"", // excludes
false, // dynamic
0, // sync?
+ // Include shared_ptr to self to ensure we not deleted before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
);
}
QueueReplicator::~QueueReplicator() {
- // FIXME aconway 2011-12-21: causes race condition? Restore.
-// queue->getBroker()->getLinks().destroy(
-// link->getHost(), link->getPort(), queue->getName(), getName(), string());
+ queue->getBroker()->getLinks().destroy(
+ link->getHost(), link->getPort(), queue->getName(), getName(), string());
}
// Called in a broker connection thread when the bridge is created.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler)
-{
+// shared_ptr to self is just to ensure we are still in memory.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
@@ -107,6 +113,8 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest);
+ // Reset self reference so this will be deleted when all external refs are gone.
+ self.reset();
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 518e97f754..5bdafb83c8 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -23,6 +23,7 @@
*/
#include "qpid/broker/Exchange.h"
#include "qpid/framing/SequenceSet.h"
+#include <boost/enable_shared_from_this.hpp>
#include <iosfwd>
namespace qpid {
@@ -47,7 +48,8 @@ namespace ha {
*
* THREAD UNSAFE: Only called in the connection thread of the source queue.
*/
-class QueueReplicator : public broker::Exchange
+class QueueReplicator : public broker::Exchange,
+ public boost::enable_shared_from_this<QueueReplicator>
{
public:
static const std::string DEQUEUE_EVENT_KEY;
@@ -57,6 +59,8 @@ class QueueReplicator : public broker::Exchange
QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
+ void activate();
+
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
@@ -70,7 +74,7 @@ class QueueReplicator : public broker::Exchange
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
-
+ boost::shared_ptr<QueueReplicator> self;
friend std::ostream& operator<<(std::ostream&, const QueueReplicator&);
};
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 4a192cd91e..416f516f7b 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -295,9 +295,8 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
name,
values[USER].asString(),
values[RHOST].asString());
- // FIXME aconway 2011-12-21: casuses race conditions? Restore.
-// // Also delete the QueueReplicator exchange for this queue.
-// broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
+ // Delete the QueueReplicator exchange for this queue.
+ broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
}
}
@@ -449,9 +448,9 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
}
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
- // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed.
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ qr->activate();
broker.getExchanges().registerExchange(qr);
}
}
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index fb0982bf83..d836ed709c 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -30,7 +30,7 @@ RECEIVERS="-r 3"
BROKERS= # Local broker
CLIENT_HOSTS= # No ssh, all clients are local
-while getopts "m:f:n:b:q:s:r:c:txyv" opt; do
+while getopts "m:f:n:b:q:s:r:c:txyv-" opt; do
case $opt in
m) MESSAGES="-m $OPTARG";;
f) FLOW="--flow-control $OPTARG";;
@@ -44,15 +44,16 @@ while getopts "m:f:n:b:q:s:r:c:txyv" opt; do
x) SAVE_RECEIVED="--save-received";;
y) NO_DELETE="--no-delete";;
v) OPTS="--verbose";;
+ -) break ;;
*) echo "Unknown option"; exit 1;;
esac
done
+shift $(($OPTIND-1))
+
REPLICATE="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
BROKER=$(echo $BROKERS | sed s/,.*//)
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE"
OPTS="$OPTS --create-option $REPLICATE"
-run_test "Benchmark:" qpid-cpp-benchmark $OPTS
-
-
+run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@"