summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-09-09 17:08:39 +0000
committerAlan Conway <aconway@apache.org>2013-09-09 17:08:39 +0000
commit54f59af8956c9df2349c0030a3104ffc605b46c8 (patch)
tree8df90f9f8cabac07ae33e4b7558e82367669d2d6 /qpid/cpp/src
parenteb985ca06c420450dfe04bf1e15124053d819e4b (diff)
downloadqpid-python-54f59af8956c9df2349c0030a3104ffc605b46c8.tar.gz
QPID-4327: HA support for TX transactions - fix TX error messages.
- Ignore un-replicated queues when replicating transactions. - Clean up cancel logic in QueueReplicator, causing "no such subscription" errors. - Remove unnecessary exchange delete warnings - ha_test.py: Shorter timeout for starting cluster brokers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1521192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp33
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h7
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp15
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py11
10 files changed, 52 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 3cfdc40b03..1e09caedb6 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -560,11 +560,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
- if (!exchange) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
- } else if (!replicationTest.getLevel(*exchange)) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
- } else {
+ if (exchange && replicationTest.getLevel(*exchange)) {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
deleteExchange(name);
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 4d6a5ee51e..21e41c0f8c 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -203,7 +203,7 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
}
void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, message);
+ QPID_LOG(critical, "Shutting down: " << message);
broker.shutdown();
throw Exception(message);
}
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 4c7dc2ef0d..04eede6fe0 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -79,7 +79,9 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()), id(true),
+ haBroker(hb), broker(hb.getBroker()),
+ replicationTest(hb.getSettings().replicateDefault.get()),
+ id(true),
exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
failed(false), ended(false)
{
@@ -98,8 +100,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
pair<QueuePtr, bool> result =
broker.getQueues().declare(
- TRANSACTION_REPLICATOR_PREFIX+id.str(),
- QueueSettings(/*durable*/false, /*autodelete*/true));
+ exchangeName, QueueSettings(/*durable*/false, /*autodelete*/true));
assert(result.second);
txQueue = result.first;
txQueue->deliver(TxMembersEvent(members).message());
@@ -109,25 +110,35 @@ PrimaryTxObserver::~PrimaryTxObserver() {}
void PrimaryTxObserver::initialize() {
- broker.getExchanges().registerExchange(
- boost::shared_ptr<Exchange>(new Exchange(shared_from_this())));
+ boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
+ FieldTable args = ex->getArgs();
+ args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
+ broker.getExchanges().registerExchange(ex);
}
void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
- enqueues[q] += m.getReplicationId();
- txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
- txQueue->deliver(m);
+ if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
+ QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
+ enqueues[q] += m.getReplicationId();
+ txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
+ txQueue->deliver(m);
+ }
}
void PrimaryTxObserver::dequeue(
const QueuePtr& q, QueuePosition pos, ReplicationId id)
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
- txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+ if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
+ QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+ txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+ }
+ else {
+ QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: "
+ << LogMessageId(*q, pos, id));
+ }
}
void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 6b75becc00..2a378e1413 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -23,7 +23,7 @@
*/
#include "types.h"
-
+#include "ReplicationTest.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
@@ -97,6 +97,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
std::string logPrefix;
HaBroker& haBroker;
broker::Broker& broker;
+ ReplicationTest replicationTest;
types::Uuid id;
std::string exchangeName;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index b4bbb3a0c4..22af7284a8 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -112,7 +112,6 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
logPrefix("Backup of "+q->getName()+": "),
subscribed(false),
settings(hb.getSettings()),
- destroyed(false),
nextId(0), maxId(0)
{
args.setString(QPID_REPLICATE, printable(NONE).str());
@@ -181,8 +180,7 @@ void QueueReplicator::destroy() {
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
- if (destroyed) return;
- destroyed = true;
+ if (!queue) return; // Already destroyed
QPID_LOG(debug, logPrefix << "Destroyed");
bridge2 = bridge; // call close outside the lock.
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
@@ -197,7 +195,7 @@ void QueueReplicator::destroy() {
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
Mutex::ScopedLock l(lock);
- if (destroyed) return; // Already destroyed
+ if (!queue) return; // Already destroyed
sessionHandler = &sessionHandler_;
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -225,14 +223,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
-void QueueReplicator::cancel(Mutex::ScopedLock&) {
- if (sessionHandler) {
- // Cancel the replicating subscription.
- AMQP_ServerProxy peer(sessionHandler->out);
- peer.getMessage().cancel(getName());
- }
-}
-
namespace {
template <class T> T decodeContent(Message& m) {
std::string content = m.getContent();
@@ -259,7 +249,7 @@ void QueueReplicator::route(Deliverable& deliverable)
{
try {
Mutex::ScopedLock l(lock);
- if (destroyed) return;
+ if (!queue) return; // Already destroyed
broker::Message& message(deliverable.getMessage());
string key(message.getRoutingKey());
if (!isEventKey(message.getRoutingKey())) {
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 06248d32aa..8e317f01f9 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -94,7 +94,6 @@ class QueueReplicator : public broker::Exchange,
virtual void deliver(const broker::Message&);
virtual void destroy(); // Called when the queue is destroyed.
- void cancel(sys::Mutex::ScopedLock&);
sys::Mutex lock;
HaBroker& haBroker;
@@ -122,7 +121,6 @@ class QueueReplicator : public broker::Exchange,
bool subscribed;
const Settings& settings;
- bool destroyed;
PositionMap positions;
ReplicationIdSet idSet; // Set of replicationIds on the queue.
ReplicationId nextId; // ID for next message to arrive.
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index 7d44d82a21..c157385ce6 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -41,6 +41,13 @@ namespace ha {
/**
* Test whether something is replicated, taking into account the
* default replication level.
+ *
+ * The primary uses a ReplicationTest with default based on configuration
+ * settings, and marks objects to be replicated with an explict replication
+ * argument.
+ *
+ * The backup uses a default of NONE, so it always accepts what the primary has
+ * marked on the object.
*/
class ReplicationTest
{
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 10181c15df..dcc5376d97 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -215,21 +215,24 @@ void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
end(l);
}
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) {
TxMembersEvent e;
decodeStr(data, e);
QPID_LOG(debug, logPrefix << "Members: " << e.members);
if (!e.members.count(haBroker.getMembership().getSelf())) {
QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
- // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- haBroker.getBroker().deleteQueue(
- getQueue()->getName(), haBroker.getUserId(), string());
+ end(l);
}
}
-void TxReplicator::end(sys::Mutex::ScopedLock& l) {
+void TxReplicator::end(sys::Mutex::ScopedLock&) {
complete = true;
- cancel(l);
+ if (!getQueue()) return; // Already destroyed
+ // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
+ // Need to do this now to cancel the subscription to the primary tx-queue
+ // which informs the primary that we have completed the transaction.
+ haBroker.getBroker().deleteQueue(
+ getQueue()->getName(), haBroker.getUserId(), string());
}
void TxReplicator::destroy() {
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 1c51673763..3280540db7 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -125,7 +125,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::", "--log-enable=debug+:acl::",
+ "--log-enable=debug+:ha::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
@@ -323,7 +323,7 @@ class HaCluster(object):
ha_port = self._ports[i]
b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
args=args, **self.kwargs)
- b.ready()
+ b.ready(timeout=5)
return b
def start(self):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 79f068641d..17a60a2c76 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -562,8 +562,7 @@ class ReplicationTests(HaBrokerTest):
return
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
- # Verify that replication works with auth=yes and HA user has at least the following
- # privileges:
+ # Minimum set of privileges required for the HA user.
aclf.write("""
# HA user
acl allow zag@QPID access queue
@@ -592,14 +591,14 @@ acl deny all all
client_credentials=Credentials("zag", "zag", "PLAIN"))
c = cluster[0].connect(username="zig", password="zig")
s0 = c.session();
- s0.receiver("q;{create:always}")
- s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ s0.sender("q;{create:always}")
+ s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
s0.sender("ex").send("foo");
s1 = c.session(transactional=True)
- s1.sender("ex").send("tx");
+ s1.sender("ex").send("foo-tx");
cluster[1].assert_browse_backup("q", ["foo"])
s1.commit()
- cluster[1].assert_browse_backup("q", ["foo", "tx"])
+ cluster[1].assert_browse_backup("q", ["foo", "foo-tx"])
def test_alternate_exchange(self):
"""Verify that alternate-exchange on exchanges and queues is propagated