diff options
author | Alan Conway <aconway@apache.org> | 2015-09-03 18:59:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2015-09-03 18:59:05 +0000 |
commit | 14de0a97b80cc0f63208b61654dd0348fd3da8b1 (patch) | |
tree | a9f3a2bcdda814d6a36ab1981ae528272d460523 | |
parent | 492f7a78b8fd2ed7dd4e08184f42b7496aa8fed1 (diff) | |
download | qpid-python-14de0a97b80cc0f63208b61654dd0348fd3da8b1.tar.gz |
QPID-5855 - Simplified HA transaction logic.
Removed complex and incorrect HA+TX logic, reverted to the following limitation:
You can use transactions in a HA cluster, but there are limitations on the
transactional guarantees. Transactions function normally with the *primary*
broker but replication to the backups is not coverted by the atomic guarantee.
The following situations are all safe:
- Client rolls back a transaction.
- Client successfully commits a transaction.
- Primary fails during a transaction *before* the client sends a commit.
- Transaction contains only one message.
The problem case is when all of the following occur:
- transaction contains multiple actions (enqueues or dequeues)
- primary fails between client sending commit and receiving commit-complete.
In this case it is possible that only part of the transaction was replicated to
the backups, so on fail-over partial transaction results may be visible.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1701109 13f79535-47bb-0310-9956-ffa450edef68
25 files changed, 36 insertions, 1368 deletions
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt index 3b1890f976..f9e0a5745e 100644 --- a/qpid/cpp/CMakeLists.txt +++ b/qpid/cpp/CMakeLists.txt @@ -60,7 +60,7 @@ enable_testing() include (CTest) if (MSVC) - # Change warning C4996 from level 1 to level 4. These are real and shouldn't + # Chaxnge warning C4996 from level 1 to level 4. These are real and shouldn't # be completely ignored, but they're pretty well checked out and will throw # a run-time error if violated. # "warning C4996: 'std::equal': Function call with parameters that may diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index cddab6b5dd..41dbb76b16 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -552,8 +552,6 @@ if (BUILD_HA) qpid/ha/Primary.cpp qpid/ha/Primary.h qpid/ha/PrimaryQueueLimits.h - qpid/ha/PrimaryTxObserver.cpp - qpid/ha/PrimaryTxObserver.h qpid/ha/QueueGuard.cpp qpid/ha/QueueGuard.h qpid/ha/QueueReplicator.cpp @@ -571,10 +569,6 @@ if (BUILD_HA) qpid/ha/StandAlone.h qpid/ha/StatusCheck.cpp qpid/ha/StatusCheck.h - qpid/ha/TxReplicatingSubscription.cpp - qpid/ha/TxReplicatingSubscription.h - qpid/ha/TxReplicator.cpp - qpid/ha/TxReplicator.h qpid/ha/types.cpp qpid/ha/types.h ) diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 9c215d197f..3873e41cc9 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -74,7 +74,7 @@ QueueFlowLimit::QueueFlowLimit(const std::string& _queueName, flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), broker(0) { - QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount + QPID_LOG(debug, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount << ", flowResumeCount=" << flowResumeCount << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize ); } diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp index 518c2fa9d0..4682c1f917 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp @@ -411,7 +411,7 @@ bool Connection::doOutput() { } void Connection::sendHeartbeat() { - adapter.heartbeat(); + requestIOProcessing(boost::bind(&ConnectionHandler::heartbeat, &adapter)); } void Connection::closeChannel(uint16_t id) { diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index 6ff624ef75..0ae4d8356d 100644 --- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -263,8 +263,7 @@ void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller, const qpid::sys::Socket& s) { if (tcpNoDelay) { s.setTcpNoDelay(); - QPID_LOG(info, - "Set TCP_NODELAY on connection to " << s.getPeerAddress()); + QPID_LOG(debug, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } async->init(aio, brokerTimer, maxNegotiateTime); diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp index 7d41b48abd..3bfde0656b 100644 --- a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp @@ -51,7 +51,7 @@ void ConnectionSettings::configureSocket(qpid::sys::Socket& socket) const { if (tcpNoDelay) { socket.setTcpNoDelay(); - QPID_LOG(info, "Set TCP_NODELAY"); + QPID_LOG(debug, "Set TCP_NODELAY"); } } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index a62080932d..d664f13893 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -21,7 +21,6 @@ #include "BrokerReplicator.h" #include "HaBroker.h" #include "QueueReplicator.h" -#include "TxReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/Queue.h" @@ -772,10 +771,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( const boost::shared_ptr<Queue>& queue) { if (replicationTest.getLevel(*queue) == ALL) { - if (TxReplicator::isTxQueue(queue->getName())) - return TxReplicator::create(haBroker, queue, link); - else - return QueueReplicator::create(haBroker, queue, link); + return QueueReplicator::create(haBroker, queue, link); } return boost::shared_ptr<QueueReplicator>(); } @@ -886,10 +882,6 @@ void BrokerReplicator::disconnectedQueueReplicator( const boost::shared_ptr<QueueReplicator>& qr) { qr->disconnect(); - if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { - // Transactions are aborted on failover so clean up tx-queues - deleteQueue(qr->getQueue()->getName()); - } } // Called by ConnectionObserver::disconnected, disconnected from the network side. diff --git a/qpid/cpp/src/qpid/ha/Event.cpp b/qpid/cpp/src/qpid/ha/Event.cpp index ff336d0b2b..8c1b52ea7a 100644 --- a/qpid/cpp/src/qpid/ha/Event.cpp +++ b/qpid/cpp/src/qpid/ha/Event.cpp @@ -44,14 +44,6 @@ bool isEventKey(const std::string& key) { const string DequeueEvent::KEY(QPID_HA+"de"); const string IdEvent::KEY(QPID_HA+"id"); -const string TxEnqueueEvent::KEY(QPID_HA+"txenq"); -const string TxDequeueEvent::KEY(QPID_HA+"txdeq"); -const string TxPrepareEvent::KEY(QPID_HA+"txpre"); -const string TxCommitEvent::KEY(QPID_HA+"txcom"); -const string TxRollbackEvent::KEY(QPID_HA+"txrb"); -const string TxPrepareOkEvent::KEY(QPID_HA+"txok"); -const string TxPrepareFailEvent::KEY(QPID_HA+"txno"); -const string TxBackupsEvent::KEY(QPID_HA+"txmem"); broker::Message makeMessage( const string& data, const string& destination, const string& routingKey) diff --git a/qpid/cpp/src/qpid/ha/Event.h b/qpid/cpp/src/qpid/ha/Event.h index 7b96e36f64..308673657c 100644 --- a/qpid/cpp/src/qpid/ha/Event.h +++ b/qpid/cpp/src/qpid/ha/Event.h @@ -94,100 +94,6 @@ struct IdEvent : public EventBase<IdEvent> { void print(std::ostream& o) const { o << id; } }; -//// Transaction events - -struct TxEnqueueEvent : public EventBase<TxEnqueueEvent> { - static const std::string KEY; - framing::LongString queue; - ReplicationId id; - - TxEnqueueEvent(std::string q=std::string(), ReplicationId i=ReplicationId()) - : queue(q), id(i) {} - void encode(framing::Buffer& b) const { b.put(queue); b.put(id); } - void decode(framing::Buffer& b) { b.get(queue); b.get(id); } - virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); } - void print(std::ostream& o) const { o << queue.value << " " << id; } -}; - -struct TxDequeueEvent : public EventBase<TxDequeueEvent> { - static const std::string KEY; - framing::LongString queue; - ReplicationId id; - - TxDequeueEvent(std::string q=std::string(), ReplicationId r=0) : - queue(q), id(r) {} - void encode(framing::Buffer& b) const { b.put(queue);b.put(id); } - void decode(framing::Buffer& b) { b.get(queue);b.get(id); } - virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); } - void print(std::ostream& o) const { o << queue.value << " " << id; } -}; - -struct TxPrepareEvent : public EventBase<TxPrepareEvent> { - static const std::string KEY; - void encode(framing::Buffer&) const {} - void decode(framing::Buffer&) {} - virtual size_t encodedSize() const { return 0; } - void print(std::ostream&) const {} -}; - -struct TxCommitEvent : public EventBase<TxCommitEvent> { - static const std::string KEY; - void encode(framing::Buffer&) const {} - void decode(framing::Buffer&) {} - virtual size_t encodedSize() const { return 0; } - void print(std::ostream&) const {} -}; - -struct TxRollbackEvent : public EventBase<TxRollbackEvent> { - static const std::string KEY; - void encode(framing::Buffer&) const {} - void decode(framing::Buffer&) {} - virtual size_t encodedSize() const { return 0; } - void print(std::ostream&) const {} -}; - -struct TxPrepareOkEvent : public EventBase<TxPrepareOkEvent> { - static const std::string KEY; - types::Uuid broker; - TxPrepareOkEvent(const types::Uuid& b=types::Uuid()) : broker(b) {} - - void encode(framing::Buffer& b) const { - b.putRawData(broker.data(), broker.size()); - } - - void decode(framing::Buffer& b) { - std::string s; - b.getRawData(s, broker.size()); - broker = types::Uuid(&s[0]); - } - virtual size_t encodedSize() const { return broker.size(); } - void print(std::ostream& o) const { o << broker; } -}; - -struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> { - static const std::string KEY; - types::Uuid broker; - TxPrepareFailEvent(const types::Uuid& b=types::Uuid()) : broker(b) {} - void encode(framing::Buffer& b) const { b.putRawData(broker.data(), broker.size()); } - void decode(framing::Buffer& b) { - std::string s; - b.getRawData(s, broker.size()); - broker = types::Uuid(&s[0]); - } - virtual size_t encodedSize() const { return broker.size(); } - void print(std::ostream& o) const { o << broker; } -}; - -struct TxBackupsEvent : public EventBase<TxBackupsEvent> { - static const std::string KEY; - UuidSet backups; - TxBackupsEvent(const UuidSet& s=UuidSet()) : backups(s) {} - void encode(framing::Buffer& b) const { b.put(backups); } - void decode(framing::Buffer& b) { b.get(backups); } - size_t encodedSize() const { return backups.encodedSize(); } - void print(std::ostream& o) const { o << backups; } -}; - }} // namespace qpid::ha #endif /*!QPID_HA_EVENT_H*/ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 870e4723b2..ca92ad77dc 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -27,11 +27,11 @@ #include "RemoteBackup.h" #include "ConnectionObserver.h" #include "QueueReplicator.h" -#include "PrimaryTxObserver.h" #include "qpid/assert.h" #include "qpid/broker/Broker.h" #include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionHandlerObserver.h" #include "qpid/framing/FieldTable.h" @@ -77,8 +77,6 @@ class PrimaryBrokerObserver : public broker::BrokerObserver void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } - void startTx(const intrusive_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); } - void startDtx(const intrusive_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); } private: Primary& primary; @@ -268,38 +266,6 @@ void Primary::addReplica(ReplicatingSubscription& rs) { replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; } -void Primary::skipEnqueues( - const types::Uuid& backup, - const boost::shared_ptr<broker::Queue>& queue, - const ReplicationIdSet& ids) -{ - sys::Mutex::ScopedLock l(lock); - ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); - if (i != replicas.end()) i->second->skipEnqueues(ids); -} - -void Primary::skipDequeues( - const types::Uuid& backup, - const boost::shared_ptr<broker::Queue>& queue, - const ReplicationIdSet& ids) -{ - sys::Mutex::ScopedLock l(lock); - ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); - if (i != replicas.end()) i->second->skipDequeues(ids); -} - -// Called from ReplicatingSubscription::cancel -void Primary::removeReplica(const ReplicatingSubscription& rs) { - boost::shared_ptr<PrimaryTxObserver> tx; - { - sys::Mutex::ScopedLock l(lock); - replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); - TxMap::const_iterator i = txMap.find(rs.getQueue()->getName()); - if (i != txMap.end()) tx = i->second.lock(); - } - if (tx) tx->cancel(rs); // Outside of lock. -} - // NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { // Set replication argument. @@ -477,22 +443,4 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) backup->startCatchup(); } -shared_ptr<PrimaryTxObserver> Primary::makeTxObserver( - const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) -{ - shared_ptr<PrimaryTxObserver> observer = - PrimaryTxObserver::create(*this, haBroker, txBuffer); - sys::Mutex::ScopedLock l(lock); - txMap[observer->getTxQueue()->getName()] = observer; - return observer; -} - -void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) { - txBuffer->setObserver(makeTxObserver(txBuffer)); -} - -void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& ) { - QPID_LOG(warning, "DTX transactions in a HA cluster are not yet atomic"); -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 84d714fc01..58e6e684ea 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -44,8 +44,6 @@ class Connection; class ConnectionObserver; class BrokerObserver; class SessionHandlerObserver; -class TxBuffer; -class DtxBuffer; } namespace sys { @@ -58,7 +56,6 @@ class ReplicatingSubscription; class RemoteBackup; class QueueGuard; class Membership; -class PrimaryTxObserver; /** * State associated with a primary broker: @@ -87,25 +84,12 @@ class Primary : public Role void readyReplica(const ReplicatingSubscription&); void addReplica(ReplicatingSubscription&); - void removeReplica(const ReplicatingSubscription&); - - /** Skip replication of ids to queue on backup. */ - void skipEnqueues(const types::Uuid& backup, - const boost::shared_ptr<broker::Queue>& queue, - const ReplicationIdSet& ids); - - /** Skip replication of dequeue of ids to queue on backup. */ - void skipDequeues(const types::Uuid& backup, - const boost::shared_ptr<broker::Queue>& queue, - const ReplicationIdSet& ids); // Called via BrokerObserver void queueCreate(const QueuePtr&); void queueDestroy(const QueuePtr&); void exchangeCreate(const ExchangePtr&); void exchangeDestroy(const ExchangePtr&); - void startTx(const boost::intrusive_ptr<broker::TxBuffer>&); - void startDtx(const boost::intrusive_ptr<broker::DtxBuffer>&); // Called via ConnectionObserver void opened(broker::Connection& connection); @@ -126,9 +110,6 @@ class Primary : public Role typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*, Hasher<UuidQueue> > ReplicaMap; - // Map of PrimaryTxObservers by tx-queue name - typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> > TxMap; - RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&); void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&); @@ -136,8 +117,6 @@ class Primary : public Role void checkReady(RemoteBackupPtr); void setCatchupQueues(const RemoteBackupPtr&, bool createGuards); void deduplicate(); - boost::shared_ptr<PrimaryTxObserver> makeTxObserver( - const boost::intrusive_ptr<broker::TxBuffer>&); mutable sys::Mutex lock; HaBroker& haBroker; @@ -161,7 +140,6 @@ class Primary : public Role boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver; boost::intrusive_ptr<sys::TimerTask> timerTask; ReplicaMap replicas; - TxMap txMap; PrimaryQueueLimits queueLimits; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp deleted file mode 100644 index 56815ef89d..0000000000 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ /dev/null @@ -1,307 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Event.h" -#include "HaBroker.h" -#include "Primary.h" -#include "PrimaryTxObserver.h" -#include "QueueGuard.h" -#include "RemoteBackup.h" -#include "ReplicatingSubscription.h" - -#include "qpid/broker/Broker.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/reply_exceptions.h" -#include <boost/lexical_cast.hpp> -#include <algorithm> - -namespace qpid { -namespace framing { -class FieldTable; -} -namespace ha { - -using namespace std; -using namespace sys; -using namespace broker; -using namespace framing; -using types::Uuid; - -// Exchange to receive prepare OK events. -class PrimaryTxObserver::Exchange : public broker::Exchange { - public: - Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) : - broker::Exchange(tx_->getExchangeName()), - tx(tx_) - { - args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg. - dispatch[TxPrepareOkEvent::KEY] = - boost::bind(&PrimaryTxObserver::txPrepareOkEvent, tx, _1); - dispatch[TxPrepareFailEvent::KEY] = - boost::bind(&PrimaryTxObserver::txPrepareFailEvent, tx, _1); - } - - void route(Deliverable& deliverable) { - const broker::Message& message(deliverable.getMessage()); - DispatchMap::iterator i = dispatch.find(message.getRoutingKey()); - if (i != dispatch.end()) i->second(message.getContent()); - } - - bool bind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; } - bool unbind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; } - bool isBound(boost::shared_ptr<Queue>, const string* const, const FieldTable* const) { return false; } - bool hasBindings() { return false; } - string getType() const { return TYPE_NAME; } - - private: - static const string TYPE_NAME; - typedef boost::function<void(const std::string&)> DispatchFn; - typedef unordered_map<std::string, DispatchFn> DispatchMap; - - DispatchMap dispatch; - boost::shared_ptr<PrimaryTxObserver> tx; -}; - -const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer"); - -boost::shared_ptr<PrimaryTxObserver> PrimaryTxObserver::create( - Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx) { - boost::shared_ptr<PrimaryTxObserver> pto(new PrimaryTxObserver(p, hb, tx)); - pto->initialize(); - return pto; -} - - -PrimaryTxObserver::PrimaryTxObserver( - Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx -) : - state(SENDING), - logPrefix(hb.logPrefix), - primary(p), haBroker(hb), broker(hb.getBroker()), - replicationTest(hb.getSettings().replicateDefault.get()), - txBuffer(tx), - id(true), - exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), - empty(true) -{ - logPrefix = "Primary TX "+shortStr(id)+": "; - - // The brokers known at this point are the ones that will be included - // in the transaction. Brokers that join later are not included. - // - BrokerInfo::Set backups_(haBroker.getMembership().otherBackups()); - std::transform(backups_.begin(), backups_.end(), inserter(backups, backups.begin()), - boost::bind(&BrokerInfo::getSystemId, _1)); - - // Delay completion of TX untill all backups have responded to prepare. - incomplete = backups; - for (size_t i = 0; i < incomplete.size(); ++i) - txBuffer->startCompleter(); - - QPID_LOG(debug, logPrefix << "Started, backups " << backups); -} - -void PrimaryTxObserver::initialize() { - boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this())); - broker.getExchanges().registerExchange(ex); - pair<QueuePtr, bool> result = - broker.createQueue( - exchangeName, - QueueSettings(/*durable*/false, /*autodelete*/true), - 0, // no owner regardless of exclusivity on primary - string(), // No alternate exchange - haBroker.getUserId(), - string()); // Remote host. - if (!result.second) - throw InvalidArgumentException( - QPID_MSG(logPrefix << "TX replication queue already exists.")); - txQueue = result.first; - txQueue->markInUse(); // Prevent auto-delete till we are done. - txQueue->deliver(TxBackupsEvent(backups).message()); -} - - -PrimaryTxObserver::~PrimaryTxObserver() {} - -void PrimaryTxObserver::checkState(State expect, const std::string& msg) { - if (state != expect) - throw IllegalStateException(QPID_MSG(logPrefix << "Illegal state: " << msg)); -} - -void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) -{ - Mutex::ScopedLock l(lock); - if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. - QPID_LOG(trace, logPrefix << "Enqueue: " << logMessageId(*q, m.getReplicationId())); - checkState(SENDING, "Too late for enqueue"); - empty = false; - enqueues[q] += m.getReplicationId(); - txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); - txQueue->deliver(m); - } -} - -void PrimaryTxObserver::dequeue( - const QueuePtr& q, QueuePosition pos, ReplicationId id) -{ - Mutex::ScopedLock l(lock); - checkState(SENDING, "Too late for dequeue"); - if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. - QPID_LOG(trace, logPrefix << "Dequeue: " << logMessageId(*q, pos, id)); - empty = false; - dequeues[q] += id; - txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); - } -} - -namespace { -struct Skip { - Uuid backup; - boost::shared_ptr<broker::Queue> queue; - ReplicationIdSet ids; - - Skip(const Uuid& backup_, - const boost::shared_ptr<broker::Queue>& queue_, - const ReplicationIdSet& ids_) : - backup(backup_), queue(queue_), ids(ids_) {} - - void skipEnqueues(Primary& p) const { p.skipEnqueues(backup, queue, ids); } - void skipDequeues(Primary& p) const { p.skipDequeues(backup, queue, ids); } -}; -} // namespace - -void PrimaryTxObserver::skip(Mutex::ScopedLock&) { - // Tell replicating subscriptions to skip IDs in the transaction. - vector<Skip> skipEnq, skipDeq; - for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) { - for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) - skipEnq.push_back(Skip(*b, q->first, q->second)); - for (QueueIdsMap::iterator q = dequeues.begin(); q != dequeues.end(); ++q) - skipDeq.push_back(Skip(*b, q->first, q->second)); - } - Mutex::ScopedUnlock u(lock); // Outside lock - for_each(skipEnq.begin(), skipEnq.end(), boost::bind(&Skip::skipEnqueues, _1, boost::ref(primary))); - for_each(skipDeq.begin(), skipDeq.end(), boost::bind(&Skip::skipDequeues, _1, boost::ref(primary))); -} - -bool PrimaryTxObserver::prepare() { - QPID_LOG(debug, logPrefix << "Prepare " << backups); - Mutex::ScopedLock l(lock); - checkState(SENDING, "Too late for prepare"); - state = PREPARING; - skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue. - txQueue->deliver(TxPrepareEvent().message()); - return true; -} - -void PrimaryTxObserver::commit() { - QPID_LOG(debug, logPrefix << "Commit"); - Mutex::ScopedLock l(lock); - checkState(PREPARING, "Cannot commit, not preparing"); - if (incomplete.size() == 0) { - txQueue->deliver(TxCommitEvent().message()); - end(l); - } else { - txQueue->deliver(TxRollbackEvent().message()); - end(l); - throw PreconditionFailedException( - QPID_MSG(logPrefix << "Cannot commit, " << incomplete.size() - << " incomplete backups")); - } -} - -void PrimaryTxObserver::rollback() { - Mutex::ScopedLock l(lock); - // Don't bleat about rolling back empty transactions, this happens all the time - // when a session closes and rolls back its outstanding transaction. - if (!empty) QPID_LOG(debug, logPrefix << "Rollback"); - if (state != ENDED) { - txQueue->deliver(TxRollbackEvent().message()); - end(l); - } -} - -void PrimaryTxObserver::end(Mutex::ScopedLock&) { - if (state == ENDED) return; - state = ENDED; - // If there are no outstanding completions, break pointer cycle here. - // Otherwise break it in cancel() when the remaining completions are done. - if (incomplete.empty()) txBuffer = 0; - txQueue->releaseFromUse(); // txQueue will auto-delete - txQueue->scheduleAutoDelete(); - txQueue.reset(); - try { - broker.getExchanges().destroy(getExchangeName()); - } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Deleting TX exchange: " << e.what()); - } -} - -bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) { - if (incomplete.erase(id)) { - txBuffer->finishCompleter(); - return true; - } - return false; -} - -bool PrimaryTxObserver::error(const Uuid& id, const std::string& msg, Mutex::ScopedLock& l) -{ - if (incomplete.find(id) != incomplete.end()) { - // Note: setError before completed since completed may trigger completion. - // Only use the TX part of the log prefix. - txBuffer->setError(Msg() << logPrefix.get() << msg << shortStr(id) << "."); - completed(id, l); - return true; - } - return false; -} - -void PrimaryTxObserver::txPrepareOkEvent(const string& data) { - Mutex::ScopedLock l(lock); - types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker; - if (completed(backup, l)) { - QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup); - } else { - QPID_LOG(error, logPrefix << "Unexpected prepare-ok response from " << backup); - } -} - -void PrimaryTxObserver::txPrepareFailEvent(const string& data) { - Mutex::ScopedLock l(lock); - types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker; - if (error(backup, "Prepare failed on backup ", l)) { - QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup); - } else { - QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup); - } -} - -void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { - Mutex::ScopedLock l(lock); - types::Uuid backup = rs.getBrokerInfo().getSystemId(); - // Normally the backup should be completed before it is cancelled. - if (completed(backup, l)) error(backup, "Unexpected disconnect:", l); - // Break the pointer cycle if backups have completed and we are done with txBuffer. - if (state == ENDED && incomplete.empty()) txBuffer = 0; -} - -}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h deleted file mode 100644 index 6f445ee212..0000000000 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ /dev/null @@ -1,133 +0,0 @@ -#ifndef QPID_HA_PRIMARYTXOBSERVER_H -#define QPID_HA_PRIMARYTXOBSERVER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "types.h" -#include "ReplicationTest.h" -#include "LogPrefix.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/TransactionObserver.h" -#include "qpid/log/Statement.h" -#include "qpid/types/Uuid.h" -#include "qpid/sys/unordered_map.h" -#include "qpid/sys/Monitor.h" -#include <boost/enable_shared_from_this.hpp> -#include <boost/intrusive_ptr.hpp> - -namespace qpid { - -namespace broker { -class Broker; -class Message; -class Consumer; -class AsyncCompletion; -} - -namespace ha { -class HaBroker; -class ReplicatingSubscription; -class Primary; - -/** - * Observe events in the lifecycle of a transaction. - * - * The observer is called by TxBuffer for each transactional event. - * It puts the events on a special tx-queue. - * A TxReplicator on the backup replicates the tx-queue and creates - * a TxBuffer on the backup equivalent to the one on the primary. - * - * Creates an exchange to receive prepare-ok/prepare-fail messages from backups. - * - * Monitors for tx-queue subscription cancellations. - * - * THREAD SAFE: called in user connection thread for TX events, - * and in backup connection threads for prepare-completed events - * and unsubscriptions. - */ -class PrimaryTxObserver : public broker::TransactionObserver, - public boost::enable_shared_from_this<PrimaryTxObserver> -{ - public: - static boost::shared_ptr<PrimaryTxObserver> create( - Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&); - - ~PrimaryTxObserver(); - - void enqueue(const QueuePtr&, const broker::Message&); - void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId); - bool prepare(); - void commit(); - void rollback(); - - types::Uuid getId() const { return id; } - QueuePtr getTxQueue() const { return txQueue; } - std::string getExchangeName() const { return exchangeName; } - - // Notify that a backup subscription has been cancelled. - void cancel(const ReplicatingSubscription&); - - private: - class Exchange; - typedef qpid::sys::unordered_map< - QueuePtr, ReplicationIdSet, Hasher<QueuePtr> > QueueIdsMap; - - enum State { - SENDING, ///< Sending TX messages and acks - PREPARING, ///< Prepare sent, waiting for response - ENDED ///< Commit or rollback sent, local transaction ended. - }; - - PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&); - void initialize(); - - void skip(sys::Mutex::ScopedLock&); - void checkState(State expect, const std::string& msg); - void end(sys::Mutex::ScopedLock&); - void txPrepareOkEvent(const std::string& data); - void txPrepareFailEvent(const std::string& data); - bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&); - bool error(const types::Uuid& id, const std::string& msg, sys::Mutex::ScopedLock& l); - - sys::Monitor lock; - State state; - LogPrefix2 logPrefix; - Primary& primary; - HaBroker& haBroker; - broker::Broker& broker; - ReplicationTest replicationTest; - // NOTE: There is an intrusive_ptr cycle between PrimaryTxObserver - // and TxBuffer. The cycle is broken in PrimaryTxObserver::end() - boost::intrusive_ptr<broker::TxBuffer> txBuffer; - - types::Uuid id; - std::string exchangeName; - QueuePtr txQueue; - QueueIdsMap enqueues, dequeues; - UuidSet backups; // All backups of transaction. - UuidSet incomplete; // Incomplete backups (not yet responded to prepare) - bool empty; // True if the transaction is empty - no enqueues/dequeues. -}; - -}} // namespace qpid::ha - -#endif /*!QPID_HA_PRIMARYTXOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 3045829ce8..c0d2689685 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -82,28 +82,27 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) { class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: ErrorListener(const boost::shared_ptr<QueueReplicator>& qr) - : queueReplicator(qr), logPrefix(qr->logPrefix) {} + : queueReplicator(qr), logPrefix(qr->logPrefix.prePrefix, qr->logPrefix.get()) {} void connectionException(framing::connection::CloseCode code, const std::string& msg) { - QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); + QPID_LOG(error, logPrefix << "Outgoing " << framing::createConnectionException(code, msg).what()); } void channelException(framing::session::DetachCode code, const std::string& msg) { - QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); + QPID_LOG(error, logPrefix << "Outgoing " << framing::createChannelException(code, msg).what()); } void executionException(framing::execution::ErrorCode code, const std::string& msg) { - QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); + QPID_LOG(error, logPrefix << "Outgoing " << framing::createSessionException(code, msg).what()); } void incomingExecutionException(ErrorCode code, const std::string& msg) { boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock(); - if (qr && !qr->deletedOnPrimary(code, msg)) - QPID_LOG(error, logPrefix << "Incoming " - << framing::createSessionException(code, msg).what()); + if (!(qr && qr->deletedOnPrimary(code, msg))) + QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } void detach() {} private: boost::weak_ptr<QueueReplicator> queueReplicator; - const LogPrefix& logPrefix; + LogPrefix2 logPrefix; }; class QueueReplicator::QueueObserver : public broker::QueueObserver { diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index d511b5bd0e..cbf00d0a8f 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -20,7 +20,6 @@ */ #include "RemoteBackup.h" #include "QueueGuard.h" -#include "TxReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index ca4dd0099f..fb4cdd014c 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -24,7 +24,6 @@ #include "QueueGuard.h" #include "QueueSnapshot.h" #include "ReplicatingSubscription.h" -#include "TxReplicatingSubscription.h" #include "Primary.h" #include "HaBroker.h" #include "qpid/assert.h" @@ -52,7 +51,6 @@ const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"rep const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info"); const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids"); const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep"); -const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep"); /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> @@ -76,12 +74,6 @@ ReplicatingSubscription::Factory::create( parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); } - else if (type == QPID_TX_REPLICATOR) { - rs.reset(new TxReplicatingSubscription( - haBroker, - parent, name, queue, ack, acquire, exclusive, tag, - resumeId, resumeTtl, arguments)); - } if (rs) rs->initialize(); return rs; } @@ -254,7 +246,6 @@ void ReplicatingSubscription::cancel() cancelled = true; } QPID_LOG(debug, logPrefix << "Cancelled"); - if (primary) primary->removeReplica(*this); getQueue()->getObservers().remove( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); guard->cancel(); @@ -280,8 +271,6 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { ReplicationIdSet oldDequeues = dequeues; - dequeues -= skipDequeue; // Don't send skipped dequeues - skipDequeue -= oldDequeues; // Forget dequeues that would have been sent. if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); sendEvent(DequeueEvent(dequeues), l); @@ -333,14 +322,4 @@ bool ReplicatingSubscription::doDispatch() } } -void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) { - Mutex::ScopedLock l(lock); - skipEnqueue += ids; -} - -void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) { - Mutex::ScopedLock l(lock); - skipDequeue += ids; -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index d6d41dd2cf..c2e51971cc 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -97,7 +97,6 @@ class ReplicatingSubscription : static const std::string QPID_ID_SET; // Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument. static const std::string QPID_QUEUE_REPLICATOR; - static const std::string QPID_TX_REPLICATOR; ReplicatingSubscription(HaBroker& haBroker, broker::SemanticState* parent, @@ -138,9 +137,6 @@ class ReplicatingSubscription : BrokerInfo getBrokerInfo() const { return info; } - void skipEnqueues(const ReplicationIdSet& ids); - void skipDequeues(const ReplicationIdSet& ids); - protected: bool doDispatch(); @@ -148,8 +144,7 @@ class ReplicatingSubscription : LogPrefix2 logPrefix; QueuePosition position; ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. - ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues. - ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues. + ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup. ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. bool wasStopped; bool ready; diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp deleted file mode 100644 index 15b33fe89d..0000000000 --- a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "TxReplicatingSubscription.h" - -namespace qpid { -namespace ha { -using namespace std; -using namespace broker; - -TxReplicatingSubscription::TxReplicatingSubscription( - HaBroker& hb, - SemanticState* parent, - const string& name, - boost::shared_ptr<Queue> queue, - bool ack, - bool acquire, - bool exclusive, - const string& tag, - const string& resumeId, - uint64_t resumeTtl, - const framing::FieldTable& arguments -) : ReplicatingSubscription(hb, parent, name, queue, ack, acquire, exclusive, tag, - resumeId, resumeTtl, arguments) -{} - -}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h deleted file mode 100644 index a363d262a0..0000000000 --- a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef QPID_HA_TXREPLICATINGSUBSCRIPTION_H -#define QPID_HA_TXREPLICATINGSUBSCRIPTION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ReplicatingSubscription.h" - -namespace qpid { -namespace ha { - -/** - * Replicating subscription for a TX queue. - */ -class TxReplicatingSubscription : public ReplicatingSubscription -{ - public: - TxReplicatingSubscription(HaBroker& haBroker, - broker::SemanticState* parent, - const std::string& name, boost::shared_ptr<broker::Queue> , - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, - const framing::FieldTable& arguments); - - /** A TxReplicatingSubscription is counted for auto-delete so we can clean - * up the TX queue when all backups are done. - */ - bool isCounted() { return true; } -}; -}} // namespace qpid::ha - -#endif /*!QPID_HA_TXREPLICATINGSUBSCRIPTION_H*/ diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp deleted file mode 100644 index 33adc9780d..0000000000 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ /dev/null @@ -1,273 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include "TxReplicator.h" -#include "Role.h" -#include "Backup.h" -#include "BrokerReplicator.h" -#include "Event.h" -#include "HaBroker.h" -#include "ReplicatingSubscription.h" -#include "types.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Link.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/SessionHandler.h" -#include "qpid/broker/TxBuffer.h" -#include "qpid/broker/TxAccept.h" -#include "qpid/broker/amqp_0_10/Connection.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/framing/BufferTypes.h" -#include "qpid/log/Statement.h" -#include "qpid/broker/amqp_0_10/MessageTransfer.h" -#include "qpid/framing/MessageTransferBody.h" -#include <boost/shared_ptr.hpp> -#include <boost/bind.hpp> -#include <sstream> - -namespace qpid { -namespace ha { - -using namespace std; -using namespace qpid::broker; -using namespace qpid::framing; -using qpid::broker::amqp_0_10::MessageTransfer; -using qpid::types::Uuid; - -namespace { -const string PREFIX(TRANSACTION_REPLICATOR_PREFIX); -} // namespace - -bool TxReplicator::isTxQueue(const string& q) { - return startsWith(q, PREFIX); -} - -Uuid TxReplicator::getTxId(const string& q) { - if (TxReplicator::isTxQueue(q)) { - std::istringstream is(q); - is.seekg(PREFIX.size()); - Uuid id; - is >> id; - if (!is.fail()) return id; - } - throw Exception(QPID_MSG("Invalid tx queue: " << q)); -} - -string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; } - -boost::shared_ptr<TxReplicator> TxReplicator::create( - HaBroker& hb, - const boost::shared_ptr<broker::Queue>& txQueue, - const boost::shared_ptr<broker::Link>& link) -{ - boost::shared_ptr<TxReplicator> tr(new TxReplicator(hb, txQueue, link)); - tr->initialize(); - return tr; -} - -TxReplicator::TxReplicator( - HaBroker& hb, - const boost::shared_ptr<broker::Queue>& txQueue, - const boost::shared_ptr<broker::Link>& link) : - QueueReplicator(hb, txQueue, link), - logPrefix(hb.logPrefix), - store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), - channel(link->nextChannel()), - empty(true), ended(false), - dequeueState(hb.getBroker().getQueues()) -{ - logPrefix = "Backup of TX "+shortStr(getTxId(txQueue->getName()))+": "; - QPID_LOG(debug, logPrefix << "Started"); - if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded.")); - - // Dispatch transaction events. - dispatch[TxEnqueueEvent::KEY] = - boost::bind(&TxReplicator::enqueue, this, _1, _2); - dispatch[TxDequeueEvent::KEY] = - boost::bind(&TxReplicator::dequeue, this, _1, _2); - dispatch[TxPrepareEvent::KEY] = - boost::bind(&TxReplicator::prepare, this, _1, _2); - dispatch[TxCommitEvent::KEY] = - boost::bind(&TxReplicator::commit, this, _1, _2); - dispatch[TxRollbackEvent::KEY] = - boost::bind(&TxReplicator::rollback, this, _1, _2); - dispatch[TxBackupsEvent::KEY] = - boost::bind(&TxReplicator::backups, this, _1, _2); -} - -TxReplicator::~TxReplicator() { - link->returnChannel(channel); -} - -// Send a message to the primary tx. -void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLock&) { - assert(sessionHandler); - const MessageTransfer& transfer(MessageTransfer::get(msg)); - for (FrameSet::const_iterator i = transfer.getFrames().begin(); - i != transfer.getFrames().end(); - ++i) - { - sessionHandler->out.handle(const_cast<AMQFrame&>(*i)); - } -} - -void TxReplicator::deliver(const broker::Message& m_) { - boost::intrusive_ptr<broker::TxBuffer> txbuf; - broker::Message m(m_); - { - sys::Mutex::ScopedLock l(lock); - if (!txBuffer) return; - txbuf = txBuffer; - m.setReplicationId(enq.id); // Use enqueued replicated id. - } - // Deliver message to the target queue, not the tx-queue. - boost::shared_ptr<broker::Queue> queue = haBroker.getBroker().getQueues().get(enq.queue); - QPID_LOG(trace, logPrefix << "Deliver " << logMessageId(*queue, m.getReplicationId())); - DeliverableMessage dm(m, txbuf.get()); - dm.deliverTo(queue); -} - -void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { - sys::Mutex::ScopedLock l(lock); - if (!txBuffer) return; - TxEnqueueEvent e; - decodeStr(data, e); - QPID_LOG(trace, logPrefix << "Enqueue: " << e); - enq = e; - empty = false; -} - -void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { - sys::Mutex::ScopedLock l(lock); - if (!txBuffer) return; - TxDequeueEvent e; - decodeStr(data, e); - QPID_LOG(trace, logPrefix << "Dequeue: " << e); - // NOTE: Backup does not see transactional dequeues until the transaction is - // prepared, then they are all receieved before the prepare event. - // We collect the events here so we can do a single scan of the queue in prepare. - dequeueState.add(e); - empty = false; -} - -void TxReplicator::DequeueState::add(const TxDequeueEvent& event) { - events[event.queue] += event.id; -} - -// Use this function as a seek() predicate to find the dequeued messages. -bool TxReplicator::DequeueState::addRecord( - const broker::Message& m, const boost::shared_ptr<Queue>& queue, - const ReplicationIdSet& rids) -{ - if (rids.contains(m.getReplicationId())) { - DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue, - string() /*tag*/, - boost::shared_ptr<Consumer>(), - true /*acquired*/, - false /*accepted*/, - false /*credit.isWindowMode()*/, - 0 /*credit*/); - // Generate record ids, unique within this transaction. - dr.setId(nextId++); - records.push_back(dr); - recordIds += dr.getId(); - } - return false; -} - -void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) { - // Process all the dequeues for a single queue, in one pass of seek() - boost::shared_ptr<broker::Queue> q = queues.get(entry.first); - q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord, - this, _1, q, entry.second)); -} - -boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() { - for_each(events.begin(), events.end(), - boost::bind(&TxReplicator::DequeueState::addRecords, this, _1)); - return boost::shared_ptr<TxAccept>( - new TxAccept(boost::cref(recordIds), boost::ref(records))); -} - -void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { - if (!txBuffer) return; - txBuffer->enlist(dequeueState.makeAccept()); - context = store->begin(); - if (txBuffer->prepare(context.get())) { - QPID_LOG(debug, logPrefix << "Local prepare OK"); - sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l); - } else { - QPID_LOG(error, logPrefix << "Local prepare failed"); - sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l); - } -} - -void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { - if (!txBuffer) return; - QPID_LOG(debug, logPrefix << "Commit"); - if (context.get()) store->commit(*context); - txBuffer->commit(); - end(l); -} - -void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { - if (!txBuffer) return; - // Don't bleat about rolling back empty transactions, this happens all the time - // when a session closes and rolls back its outstanding transaction. - if (!empty) QPID_LOG(debug, logPrefix << "Rollback"); - if (context.get()) store->abort(*context); - txBuffer->rollback(); - end(l); -} - -void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) { - TxBackupsEvent e; - decodeStr(data, e); - if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) { - QPID_LOG(info, logPrefix << "Not participating"); - end(l); - } else { - QPID_LOG(debug, logPrefix << "Backups: " << e.backups); - txBuffer = new broker::TxBuffer; - } -} - -void TxReplicator::end(sys::Mutex::ScopedLock&) { - ended = true; - txBuffer = 0; - // QueueReplicator::destroy cancels subscription to the primary tx-queue - // which allows the primary to clean up resources. - sys::Mutex::ScopedUnlock u(lock); - QueueReplicator::destroy(); -} - -// Called when the tx queue is deleted. -void TxReplicator::destroy(sys::Mutex::ScopedLock& l) { - if (!ended) { - if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback"); - rollback(string(), l); - } - QueueReplicator::destroy(l); -} - -}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h deleted file mode 100644 index c7599d21b1..0000000000 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ /dev/null @@ -1,136 +0,0 @@ -#ifndef QPID_HA_TRANSACTIONREPLICATOR_H -#define QPID_HA_TRANSACTIONREPLICATOR_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "LogPrefix.h" -#include "QueueReplicator.h" -#include "Event.h" -#include "qpid/broker/DeliveryRecord.h" -#include "qpid/broker/TransactionalStore.h" -#include "qpid/sys/Mutex.h" -#include "qpid/types/Uuid.h" - -namespace qpid { - -namespace broker { -class TxBuffer; -class TxAccept; -class DtxBuffer; -class Broker; -class MessageStore; -class Deliverable; -} - -namespace ha { -class BrokerReplicator; - -/** - * Exchange created on a backup broker to replicate a transaction on the primary. - * - * Subscribes to a tx-queue like a normal queue but puts replicated messages and - * transaction events into a local TxBuffer. - * - * THREAD SAFE: Called in different connection threads. - */ -class TxReplicator : public QueueReplicator { - public: - typedef boost::shared_ptr<broker::Queue> QueuePtr; - typedef boost::shared_ptr<broker::Link> LinkPtr; - - static bool isTxQueue(const std::string& queue); - static types::Uuid getTxId(const std::string& queue); - - static boost::shared_ptr<TxReplicator> create( - HaBroker&, const QueuePtr& txQueue, const LinkPtr& link); - - ~TxReplicator(); - - std::string getType() const; - - // QueueReplicator overrides - using QueueReplicator::destroy; - void destroy(sys::Mutex::ScopedLock&); - - protected: - - void deliver(const broker::Message&); - - private: - - typedef void (TxReplicator::*DispatchFunction)( - const std::string&, sys::Mutex::ScopedLock&); - typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap; - typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap; - - TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link); - void sendMessage(const broker::Message&, sys::Mutex::ScopedLock&); - void enqueue(const std::string& data, sys::Mutex::ScopedLock&); - void dequeue(const std::string& data, sys::Mutex::ScopedLock&); - void prepare(const std::string& data, sys::Mutex::ScopedLock&); - void commit(const std::string& data, sys::Mutex::ScopedLock&); - void rollback(const std::string& data, sys::Mutex::ScopedLock&); - void backups(const std::string& data, sys::Mutex::ScopedLock&); - void end(sys::Mutex::ScopedLock&); - - LogPrefix2 logPrefix; - TxEnqueueEvent enq; // Enqueue data for next deliver. - boost::intrusive_ptr<broker::TxBuffer> txBuffer; - broker::MessageStore* store; - std::auto_ptr<broker::TransactionContext> context; - framing::ChannelId channel; // Channel to send prepare-complete. - bool empty, ended; - - // Class to process dequeues and create DeliveryRecords to populate a - // TxAccept. - class DequeueState { - public: - DequeueState(broker::QueueRegistry& qr) : queues(qr) {} - void add(const TxDequeueEvent&); - boost::shared_ptr<broker::TxAccept> makeAccept(); - - private: - // Delivery record IDs are command IDs from the session. - // On a backup we will just fake these Ids. - typedef framing::SequenceNumber Id; - typedef framing::SequenceSet IdSet; - typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap; - - bool addRecord(const broker::Message& m, - const boost::shared_ptr<broker::Queue>&, - const ReplicationIdSet& ); - void addRecords(const DequeueMap::value_type& entry); - - broker::QueueRegistry& queues; - EventMap events; - broker::DeliveryRecords records; - broker::QueueCursor cursor; - framing::SequenceNumber nextId; - IdSet recordIds; - }; - DequeueState dequeueState; -}; - - -}} // namespace qpid::ha - -#endif /*!QPID_HA_TRANSACTIONREPLICATOR_H*/ diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp index 60cc0f27ce..3088661c95 100644 --- a/qpid/cpp/src/qpid/ha/types.cpp +++ b/qpid/cpp/src/qpid/ha/types.cpp @@ -39,7 +39,6 @@ const string QPID_HA_UUID("qpid.ha-uuid"); const char* QPID_HA_PREFIX = "qpid.ha-"; const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:"; -const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:"; bool startsWith(const string& name, const string& prefix) { return name.compare(0, prefix.size(), prefix) == 0; diff --git a/qpid/cpp/src/qpid/sys/SocketTransport.cpp b/qpid/cpp/src/qpid/sys/SocketTransport.cpp index 86c9d301e9..36edcf24a3 100644 --- a/qpid/cpp/src/qpid/sys/SocketTransport.cpp +++ b/qpid/cpp/src/qpid/sys/SocketTransport.cpp @@ -48,7 +48,7 @@ namespace { { if (opts.tcpNoDelay) { s.setTcpNoDelay(); - QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); + QPID_LOG(debug, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } AsynchIO* aio = AsynchIO::create diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 82ca808cb1..ace225a509 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -267,6 +267,8 @@ acl allow all all c = self.connect_admin() try: wait_address(c, queue) + if not "msg" in kwargs: + kwargs["msg"]=str(self) assert_browse_retry(c.session(), queue, expected, **kwargs) finally: c.close() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2ee2e291e2..0efb8182ec 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1327,28 +1327,25 @@ class TransactionTests(HaBrokerTest): sb.close() return tx - def tx_subscriptions(self, broker): - """Return list of queue names for tx subscriptions""" - return [q for q in broker.agent.repsub_queues() - if q.startswith("qpid.ha-tx")] - def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True, wait=True) tx = self.tx_simple_setup(cluster) tx.sync() - tx_queues = cluster[0].agent.tx_queues() - - # NOTE: backup does not process transactional dequeues until prepare - cluster[1].assert_browse_backup("a", ["x","y","z"]) - cluster[1].assert_browse_backup("b", ['0', '1', '2']) - tx.acknowledge() + # Pre transaction - messages are acquired on primary but not yet dequeued + # so still there on backup. + cluster[0].assert_browse_backup("a", []) + cluster[1].assert_browse_backup("a", ['x', 'y', 'z']) + for b in cluster: + b.assert_browse_backup("b", ['0', '1', '2']) tx.commit() tx.sync() tx.close() + # Post transaction: all synced. for b in cluster: - self.assert_simple_commit_outcome(b, tx_queues) + b.assert_browse_backup("a", []) + b.assert_browse_backup("b", ['0', '1', '2', "x","y","z"]) # Verify non-tx dequeue is replicated correctly c = cluster.connect(0, protocol=self.tx_protocol) @@ -1360,121 +1357,22 @@ class TransactionTests(HaBrokerTest): c.close() tx.connection.close() - - def check_enq_deq(self, cluster, queue, expect): - for b in cluster: - q = b.agent.getQueue(queue) - self.assertEqual( - (b.name,)+expect, - (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues)) - - def test_tx_enq_notx_deq(self): - """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" - cluster = HaCluster(self, 2, test_store=True) - c = cluster.connect(0, protocol=self.tx_protocol) - - tx = c.session(transactional=True) - c.session().sender("qq;{create:always}").send("m1") - tx.sender("qq;{create:always}").send("tx") - tx.commit() - tx.close() - c.session().sender("qq;{create:always}").send("m2") - self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0)) - - notx = c.session() - self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))]) - notx.acknowledge() - self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0)) - for b in cluster: b.assert_browse_backup('qq', [], msg=b) - for b in cluster: self.assert_tx_clean(b) - - def test_tx_enq_notx_deq_qpid_send(self): - """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" - cluster = HaCluster(self, 2, test_store=True) - - self.popen( - ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1', - '--content-string=foo'] - ).assert_exit_ok() - for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b) - self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0)) - - self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok() - self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0)) - for b in cluster: b.assert_browse_backup('qq', [], msg=b) - for b in cluster: self.assert_tx_clean(b) - - def assert_tx_clean(self, b): - """Verify that there are no transaction artifacts - (exchanges, queues, subscriptions) on b.""" - class FunctionCache: # Call a function and cache the result. - def __init__(self, f): self.f, self.value = f, None - def __call__(self): self.value = self.f(); return self.value - - txq= FunctionCache(b.agent.tx_queues) - assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value) - txsub = FunctionCache(lambda: self.tx_subscriptions(b)) - assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value) - # TODO aconway 2013-10-15: TX exchanges don't show up in management. - - def assert_simple_commit_outcome(self, b, tx_queues): - b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) - # Check for expected actions on the store - expect = """<enqueue a x> -<enqueue a y> -<enqueue a z> -<begin tx 1> -<dequeue a x tx=1> -<dequeue a y tx=1> -<dequeue a z tx=1> -<commit tx=1> -""" - self.assertEqual(expect, open_read(b.store_log), msg=b) - self.assert_tx_clean(b) - def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster) tx.sync() - tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() tx.rollback() - tx.close() # For clean test. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + + for b in cluster: + b.assert_browse_backup("a", ["x","y","z"]) + b.assert_browse_backup("b", ['0', '1', '2']) + + tx.close() tx.connection.close() - def assert_simple_rollback_outcome(self, b, tx_queues): - b.assert_browse_backup("a", ["x","y","z"], msg=b) - b.assert_browse_backup("b", ['0', '1', '2'], msg=b) - # Check for expected actions on the store - expect = """<enqueue a x> -<enqueue a y> -<enqueue a z> -""" - self.assertEqual(open_read(b.store_log), expect, msg=b) - self.assert_tx_clean(b) def test_tx_simple_failure(self): - """Verify we throw TransactionAborted if there is a store error during a transaction""" - cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. - try: - cluster.bounce(0) # Should cause roll-back - tx.connection.session() # Wait for reconnect - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) - self.assertRaises(qm.TransactionAborted, tx.sync) - self.assertRaises(qm.TransactionAborted, tx.commit) - try: tx.connection.close() - except qm.TransactionAborted: pass # Occasionally get exception on close. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) - finally: l.restore() - - def test_tx_simple_failover(self): """Verify we throw TransactionAborted if there is a fail-over during a transaction""" cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster) @@ -1485,79 +1383,15 @@ class TransactionTests(HaBrokerTest): try: cluster.bounce(0) # Should cause roll-back tx.connection.session() # Wait for reconnect - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) self.assertRaises(qm.TransactionAborted, tx.sync) self.assertRaises(qm.TransactionAborted, tx.commit) try: tx.connection.close() except qm.TransactionAborted: pass # Occasionally get exception on close. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + for b in cluster: + b.assert_browse_backup("a", ["x","y","z"]) + b.assert_browse_backup("b", ['0', '1', '2']) finally: l.restore() - def test_tx_unknown_failover(self): - """Verify we throw TransactionUnknown if there is a failure during commit""" - cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. - try: - os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response - class CommitThread(Thread): - def run(self): - try: tx.commit() - except Exception, e: - self.error = e - t = CommitThread() - t.start() # Commit in progress - t.join(timeout=0.01) - self.assertTrue(t.isAlive()) - cluster.bounce(0) - os.kill(cluster[2].pid, signal.SIGCONT) - t.join() - try: raise t.error - except qm.TransactionUnknown: pass - for b in cluster: self.assert_tx_clean(b) - try: tx.connection.close() - except qm.TransactionUnknown: pass # Occasionally get exception on close. - finally: l.restore() - - def test_tx_no_backups(self): - """Test the special case of a TX where there are no backups""" - - # Test commit - cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.acknowledge() - tx.commit() - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.close() - self.assert_simple_commit_outcome(cluster[0], tx_queues) - - # Test rollback - cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - tx.rollback() - tx.sync() - tx.close() - self.assert_simple_rollback_outcome(cluster[0], tx_queues) - - def test_tx_backup_fail(self): - cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]]) - c = cluster[0].connect(protocol=self.tx_protocol) - tx = c.session(transactional=True) - s = tx.sender("q;{create:always,node:{durable:true}}") - for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True)) - def commit_sync(): tx.commit(); tx.sync() - self.assertRaises(qm.TransactionAborted, commit_sync) - for b in cluster: b.assert_browse_backup("q", []) - self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") - self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n") - def test_tx_join_leave(self): """Test cluster members joining/leaving cluster. Also check that tx-queues are cleaned up at end of transaction.""" @@ -1568,13 +1402,11 @@ class TransactionTests(HaBrokerTest): tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) s = tx.sender("q;{create:always}") s.send("a", sync=True) - self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") tx.commit() tx.connection.close() for b in [cluster[0],cluster[2]]: - self.assert_tx_clean(b) b.assert_browse_backup("q", ["a","b"], msg=b) # Joining tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) @@ -1583,7 +1415,6 @@ class TransactionTests(HaBrokerTest): cluster.restart(1) # Not a part of the current transaction. tx.commit() tx.connection.close() - for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) @@ -1596,7 +1427,6 @@ class TransactionTests(HaBrokerTest): for s in sessions: sn = s.sender("qq;{create:always,node:{durable:true}}") sn.send(qm.Message("foo", durable=True)) - self.assertEqual(n, len(cluster[1].agent.tx_queues())) threads = [ Thread(target=s.commit) for s in sessions] for t in threads: t.start() cluster[0].ready(timeout=1) # Check for deadlock |