diff options
| author | Alan Conway <aconway@apache.org> | 2013-09-09 17:08:39 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-09-09 17:08:39 +0000 |
| commit | 54f59af8956c9df2349c0030a3104ffc605b46c8 (patch) | |
| tree | 8df90f9f8cabac07ae33e4b7558e82367669d2d6 /qpid/cpp/src | |
| parent | eb985ca06c420450dfe04bf1e15124053d819e4b (diff) | |
| download | qpid-python-54f59af8956c9df2349c0030a3104ffc605b46c8.tar.gz | |
QPID-4327: HA support for TX transactions - fix TX error messages.
- Ignore un-replicated queues when replicating transactions.
- Clean up cancel logic in QueueReplicator, causing "no such subscription" errors.
- Remove unnecessary exchange delete warnings
- ha_test.py: Shorter timeout for starting cluster brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1521192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 33 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 15 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 11 |
10 files changed, 52 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 3cfdc40b03..1e09caedb6 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -560,11 +560,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); boost::shared_ptr<Exchange> exchange = exchanges.find(name); - if (!exchange) { - QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name); - } else if (!replicationTest.getLevel(*exchange)) { - QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); - } else { + if (exchange && replicationTest.getLevel(*exchange)) { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); if (exchangeTracker.get()) exchangeTracker->event(name); deleteExchange(name); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 4d6a5ee51e..21e41c0f8c 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -203,7 +203,7 @@ std::vector<Url> HaBroker::getKnownBrokers() const { } void HaBroker::shutdown(const std::string& message) { - QPID_LOG(critical, message); + QPID_LOG(critical, "Shutting down: " << message); broker.shutdown(); throw Exception(message); } diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 4c7dc2ef0d..04eede6fe0 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -79,7 +79,9 @@ class PrimaryTxObserver::Exchange : public broker::Exchange { const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer"); PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : - haBroker(hb), broker(hb.getBroker()), id(true), + haBroker(hb), broker(hb.getBroker()), + replicationTest(hb.getSettings().replicateDefault.get()), + id(true), exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), failed(false), ended(false) { @@ -98,8 +100,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : pair<QueuePtr, bool> result = broker.getQueues().declare( - TRANSACTION_REPLICATOR_PREFIX+id.str(), - QueueSettings(/*durable*/false, /*autodelete*/true)); + exchangeName, QueueSettings(/*durable*/false, /*autodelete*/true)); assert(result.second); txQueue = result.first; txQueue->deliver(TxMembersEvent(members).message()); @@ -109,25 +110,35 @@ PrimaryTxObserver::~PrimaryTxObserver() {} void PrimaryTxObserver::initialize() { - broker.getExchanges().registerExchange( - boost::shared_ptr<Exchange>(new Exchange(shared_from_this()))); + boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this())); + FieldTable args = ex->getArgs(); + args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg. + broker.getExchanges().registerExchange(ex); } void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) { sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m)); - enqueues[q] += m.getReplicationId(); - txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); - txQueue->deliver(m); + if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. + QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m)); + enqueues[q] += m.getReplicationId(); + txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); + txQueue->deliver(m); + } } void PrimaryTxObserver::dequeue( const QueuePtr& q, QueuePosition pos, ReplicationId id) { sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); - txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); + if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. + QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); + txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); + } + else { + QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: " + << LogMessageId(*q, pos, id)); + } } void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) { diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 6b75becc00..2a378e1413 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -23,7 +23,7 @@ */ #include "types.h" - +#include "ReplicationTest.h" #include "qpid/broker/TransactionObserver.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" @@ -97,6 +97,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, std::string logPrefix; HaBroker& haBroker; broker::Broker& broker; + ReplicationTest replicationTest; types::Uuid id; std::string exchangeName; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index b4bbb3a0c4..22af7284a8 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -112,7 +112,6 @@ QueueReplicator::QueueReplicator(HaBroker& hb, logPrefix("Backup of "+q->getName()+": "), subscribed(false), settings(hb.getSettings()), - destroyed(false), nextId(0), maxId(0) { args.setString(QPID_REPLICATE, printable(NONE).str()); @@ -181,8 +180,7 @@ void QueueReplicator::destroy() { boost::shared_ptr<Bridge> bridge2; // To call outside of lock { Mutex::ScopedLock l(lock); - if (destroyed) return; - destroyed = true; + if (!queue) return; // Already destroyed QPID_LOG(debug, logPrefix << "Destroyed"); bridge2 = bridge; // call close outside the lock. // Need to drop shared pointers to avoid pointer cycles keeping this in memory. @@ -197,7 +195,7 @@ void QueueReplicator::destroy() { // Note: called with the Link lock held. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) { Mutex::ScopedLock l(lock); - if (destroyed) return; // Already destroyed + if (!queue) return; // Already destroyed sessionHandler = &sessionHandler_; AMQP_ServerProxy peer(sessionHandler->out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -225,14 +223,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments); } -void QueueReplicator::cancel(Mutex::ScopedLock&) { - if (sessionHandler) { - // Cancel the replicating subscription. - AMQP_ServerProxy peer(sessionHandler->out); - peer.getMessage().cancel(getName()); - } -} - namespace { template <class T> T decodeContent(Message& m) { std::string content = m.getContent(); @@ -259,7 +249,7 @@ void QueueReplicator::route(Deliverable& deliverable) { try { Mutex::ScopedLock l(lock); - if (destroyed) return; + if (!queue) return; // Already destroyed broker::Message& message(deliverable.getMessage()); string key(message.getRoutingKey()); if (!isEventKey(message.getRoutingKey())) { diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 06248d32aa..8e317f01f9 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -94,7 +94,6 @@ class QueueReplicator : public broker::Exchange, virtual void deliver(const broker::Message&); virtual void destroy(); // Called when the queue is destroyed. - void cancel(sys::Mutex::ScopedLock&); sys::Mutex lock; HaBroker& haBroker; @@ -122,7 +121,6 @@ class QueueReplicator : public broker::Exchange, bool subscribed; const Settings& settings; - bool destroyed; PositionMap positions; ReplicationIdSet idSet; // Set of replicationIds on the queue. ReplicationId nextId; // ID for next message to arrive. diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index 7d44d82a21..c157385ce6 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -41,6 +41,13 @@ namespace ha { /** * Test whether something is replicated, taking into account the * default replication level. + * + * The primary uses a ReplicationTest with default based on configuration + * settings, and marks objects to be replicated with an explict replication + * argument. + * + * The backup uses a default of NONE, so it always accepts what the primary has + * marked on the object. */ class ReplicationTest { diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 10181c15df..dcc5376d97 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -215,21 +215,24 @@ void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { end(l); } -void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { +void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) { TxMembersEvent e; decodeStr(data, e); QPID_LOG(debug, logPrefix << "Members: " << e.members); if (!e.members.count(haBroker.getMembership().getSelf())) { QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating"); - // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. - haBroker.getBroker().deleteQueue( - getQueue()->getName(), haBroker.getUserId(), string()); + end(l); } } -void TxReplicator::end(sys::Mutex::ScopedLock& l) { +void TxReplicator::end(sys::Mutex::ScopedLock&) { complete = true; - cancel(l); + if (!getQueue()) return; // Already destroyed + // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. + // Need to do this now to cancel the subscription to the primary tx-queue + // which informs the primary that we have completed the transaction. + haBroker.getBroker().deleteQueue( + getQueue()->getName(), haBroker.getUserId(), string()); } void TxReplicator::destroy() { diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 1c51673763..3280540db7 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -125,7 +125,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", "--log-enable=debug+:acl::", + "--log-enable=debug+:ha::", # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont @@ -323,7 +323,7 @@ class HaCluster(object): ha_port = self._ports[i] b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name, args=args, **self.kwargs) - b.ready() + b.ready(timeout=5) return b def start(self): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 79f068641d..17a60a2c76 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -562,8 +562,7 @@ class ReplicationTests(HaBrokerTest): return acl=os.path.join(os.getcwd(), "policy.acl") aclf=file(acl,"w") - # Verify that replication works with auth=yes and HA user has at least the following - # privileges: + # Minimum set of privileges required for the HA user. aclf.write(""" # HA user acl allow zag@QPID access queue @@ -592,14 +591,14 @@ acl deny all all client_credentials=Credentials("zag", "zag", "PLAIN")) c = cluster[0].connect(username="zig", password="zig") s0 = c.session(); - s0.receiver("q;{create:always}") - s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + s0.sender("q;{create:always}") + s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") s0.sender("ex").send("foo"); s1 = c.session(transactional=True) - s1.sender("ex").send("tx"); + s1.sender("ex").send("foo-tx"); cluster[1].assert_browse_backup("q", ["foo"]) s1.commit() - cluster[1].assert_browse_backup("q", ["foo", "tx"]) + cluster[1].assert_browse_backup("q", ["foo", "foo-tx"]) def test_alternate_exchange(self): """Verify that alternate-exchange on exchanges and queues is propagated |
